From c18fed97461c415ecefce5d72fb71653436710b4 Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Mon, 26 Oct 2020 09:17:35 -0400 Subject: [PATCH 1/8] Refactor packetbeat to support agent-based configuration --- packetbeat/beater/packetbeat.go | 275 +++++------------- packetbeat/beater/processor.go | 142 +++++++++ packetbeat/beater/reloader.go | 100 +++++++ packetbeat/beater/setup.go | 72 +++++ packetbeat/beater/worker.go | 75 +++++ packetbeat/config/agent.go | 80 +++++ packetbeat/config/agent_test.go | 49 ++++ packetbeat/config/config.go | 35 +++ packetbeat/flows/flows.go | 5 +- packetbeat/flows/flows_test.go | 3 +- packetbeat/flows/worker.go | 8 +- packetbeat/flows/worker_test.go | 3 +- packetbeat/procs/procs.go | 2 - packetbeat/protos/amqp/amqp.go | 8 +- packetbeat/protos/amqp/amqp_parser.go | 3 +- packetbeat/protos/amqp/amqp_test.go | 3 +- packetbeat/protos/cassandra/cassandra.go | 10 +- packetbeat/protos/cassandra/trans.go | 7 +- packetbeat/protos/dhcpv4/dhcpv4.go | 12 +- packetbeat/protos/dhcpv4/dhcpv4_test.go | 5 +- packetbeat/protos/dns/dns.go | 8 +- packetbeat/protos/dns/dns_tcp.go | 3 +- packetbeat/protos/dns/dns_test.go | 3 +- packetbeat/protos/dns/dns_udp.go | 3 +- packetbeat/protos/http/http.go | 9 +- packetbeat/protos/http/http_test.go | 3 +- packetbeat/protos/icmp/icmp.go | 9 +- packetbeat/protos/icmp/icmp_test.go | 3 +- packetbeat/protos/memcache/memcache.go | 8 +- packetbeat/protos/memcache/memcache_test.go | 3 +- packetbeat/protos/memcache/plugin_tcp.go | 3 +- packetbeat/protos/memcache/plugin_udp.go | 3 +- packetbeat/protos/mongodb/mongodb.go | 9 +- packetbeat/protos/mongodb/mongodb_test.go | 3 +- packetbeat/protos/mysql/mysql.go | 9 +- packetbeat/protos/mysql/mysql_test.go | 3 +- packetbeat/protos/nfs/rpc.go | 2 + packetbeat/protos/pgsql/pgsql.go | 9 +- packetbeat/protos/pgsql/pgsql_test.go | 3 +- packetbeat/protos/protos.go | 20 +- packetbeat/protos/redis/redis.go | 9 +- packetbeat/protos/registry.go | 3 + packetbeat/protos/sip/parser.go | 9 +- packetbeat/protos/sip/plugin.go | 10 +- packetbeat/protos/sip/plugin_test.go | 3 +- packetbeat/protos/tcp/tcp_test.go | 3 +- packetbeat/protos/thrift/thrift.go | 8 +- packetbeat/protos/thrift/thrift_test.go | 3 +- packetbeat/protos/tls/tls.go | 9 +- packetbeat/protos/tls/tls_test.go | 3 +- packetbeat/publish/publish.go | 4 + .../tcp-protocol/{protocol}/trans.go.tmpl | 7 +- .../{protocol}/{protocol}.go.tmpl | 10 +- 53 files changed, 805 insertions(+), 289 deletions(-) create mode 100644 packetbeat/beater/processor.go create mode 100644 packetbeat/beater/reloader.go create mode 100644 packetbeat/beater/setup.go create mode 100644 packetbeat/beater/worker.go create mode 100644 packetbeat/config/agent.go create mode 100644 packetbeat/config/agent_test.go diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index b862bd3ee11..0526e2090d7 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -18,47 +18,25 @@ package beater import ( - "errors" "flag" - "fmt" - "sync" "time" - "github.com/tsg/gopacket/layers" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/service" "github.com/elastic/beats/v7/packetbeat/config" - "github.com/elastic/beats/v7/packetbeat/decoder" - "github.com/elastic/beats/v7/packetbeat/flows" "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" - "github.com/elastic/beats/v7/packetbeat/protos/icmp" - "github.com/elastic/beats/v7/packetbeat/protos/tcp" - "github.com/elastic/beats/v7/packetbeat/protos/udp" "github.com/elastic/beats/v7/packetbeat/publish" - "github.com/elastic/beats/v7/packetbeat/sniffer" // Add packetbeat default processors _ "github.com/elastic/beats/v7/packetbeat/processor/add_kubernetes_metadata" ) -// Beater object. Contains all objects needed to run the beat -type packetbeat struct { - config config.Config - cmdLineArgs flags - sniff *sniffer.Sniffer - - // publisher/pipeline - pipeline beat.Pipeline - transPub *publish.TransactionPublisher - flows *flows.Flows -} - type flags struct { file *string loop *int @@ -79,8 +57,8 @@ func init() { } } -func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { - config := config.Config{ +func initialConfig() config.Config { + return config.Config{ Interfaces: config.InterfacesConfig{ File: *cmdLineArgs.file, Loop: *cmdLineArgs.loop, @@ -89,111 +67,59 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { Dumpfile: *cmdLineArgs.dumpfile, }, } +} + +// Beater object. Contains all objects needed to run the beat +type packetbeat struct { + config *common.Config + factory *processorFactory + publisher *publish.TransactionPublisher + shutdownTimeout time.Duration + done chan struct{} +} + +func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { + config := initialConfig() err := rawConfig.Unpack(&config) if err != nil { logp.Err("fails to read the beat config: %v, %v", err, config) return nil, err } - pb := &packetbeat{ - config: config, - cmdLineArgs: cmdLineArgs, - } - err = pb.init(b) - if err != nil { - return nil, err - } - - return pb, nil -} - -// init packetbeat components -func (pb *packetbeat) init(b *beat.Beat) error { - var err error - cfg := &pb.config + watcher := procs.ProcessesWatcher{} // Enable the process watcher only if capturing live traffic - if cfg.Interfaces.File == "" { - err = procs.ProcWatcher.Init(cfg.Procs) + if config.Interfaces.File == "" { + err = watcher.Init(config.Procs) if err != nil { logp.Critical(err.Error()) - return err + return nil, err } } else { logp.Info("Process watcher disabled when file input is used") } - pb.pipeline = b.Publisher - pb.transPub, err = publish.NewTransactionPublisher( + publisher, err := publish.NewTransactionPublisher( b.Info.Name, b.Publisher, - pb.config.IgnoreOutgoing, - pb.config.Interfaces.File == "", + config.IgnoreOutgoing, + config.Interfaces.File == "", ) if err != nil { - return err - } - - logp.Debug("main", "Initializing protocol plugins") - err = protos.Protos.Init(false, pb.transPub, cfg.Protocols, cfg.ProtocolsList) - if err != nil { - return fmt.Errorf("Initializing protocol analyzers failed: %v", err) - } - - if err := pb.setupFlows(); err != nil { - return err - } - - return pb.setupSniffer() -} - -func (pb *packetbeat) setupSniffer() error { - config := &pb.config - - icmp, err := pb.icmpConfig() - if err != nil { - return err - } - - withVlans := config.Interfaces.WithVlans - withICMP := icmp.Enabled() - - filter := config.Interfaces.BpfFilter - if filter == "" && !config.Flows.IsEnabled() { - filter = protos.Protos.BpfFilter(withVlans, withICMP) - } - - pb.sniff, err = sniffer.New(false, filter, pb.createWorker, config.Interfaces) - return err -} - -func (pb *packetbeat) setupFlows() error { - config := &pb.config - if !config.Flows.IsEnabled() { - return nil - } - - processors, err := processors.New(config.Flows.Processors) - if err != nil { - return err - } - - client, err := pb.pipeline.ConnectWith(beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - EventMetadata: config.Flows.EventMetadata, - Processor: processors, - KeepNull: config.Flows.KeepNull, - }, - }) - if err != nil { - return err + return nil, err } - pb.flows, err = flows.NewFlows(client.PublishAll, config.Flows) - if err != nil { - return err + factory := newProcessorFactory(b.Info.Name, make(chan error, 1), publisher) + if err := factory.CheckConfig(rawConfig); err != nil { + return nil, err } - return nil + return &packetbeat{ + config: rawConfig, + shutdownTimeout: config.ShutdownTimeout, + factory: factory, + publisher: publisher, + done: make(chan struct{}), + }, nil } func (pb *packetbeat) Run(b *beat.Beat) error { @@ -205,114 +131,63 @@ func (pb *packetbeat) Run(b *beat.Beat) error { } }() - defer pb.transPub.Stop() + defer pb.publisher.Stop() - timeout := pb.config.ShutdownTimeout + timeout := pb.shutdownTimeout if timeout > 0 { defer time.Sleep(timeout) } - if pb.flows != nil { - pb.flows.Start() - defer pb.flows.Stop() + if !b.Manager.Enabled() { + return pb.runStatic(b, pb.factory) } + return pb.runManaged(b, pb.factory) +} - var wg sync.WaitGroup - errC := make(chan error, 1) - - // Run the sniffer in background - wg.Add(1) - go func() { - defer wg.Done() +func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error { + runner, err := factory.Create(b.Publisher, pb.config) + if err != nil { + return err + } + runner.Start() + defer runner.Stop() - err := pb.sniff.Run() - if err != nil { - errC <- fmt.Errorf("Sniffer main loop failed: %v", err) - } - }() + logp.Debug("main", "Waiting for the runner to finish") - logp.Debug("main", "Waiting for the sniffer to finish") - wg.Wait() select { - default: - case err := <-errC: + case <-pb.done: + case err := <-factory.err: + close(pb.done) return err } - return nil } -// Called by the Beat stop function -func (pb *packetbeat) Stop() { - logp.Info("Packetbeat send stop signal") - pb.sniff.Stop() -} - -func (pb *packetbeat) createWorker(dl layers.LinkType) (sniffer.Worker, error) { - var icmp4 icmp.ICMPv4Processor - var icmp6 icmp.ICMPv6Processor - cfg, err := pb.icmpConfig() - if err != nil { - return nil, err - } - if cfg.Enabled() { - reporter, err := pb.transPub.CreateReporter(cfg) - if err != nil { - return nil, err +func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error { + runner := newReloader(management.DebugK, factory, b.Publisher) + reload.Register.MustRegister(b.Info.Beat, runner) + defer runner.Stop() + + logp.Debug("main", "Waiting for the runner to finish") + + for { + select { + case <-pb.done: + return nil + case err := <-factory.err: + // when we're managed we don't want + // to stop if the sniffer exited without an error + // this would happen during a configuration reload + if err != nil { + close(pb.done) + return err + } } - - icmp, err := icmp.New(false, reporter, cfg) - if err != nil { - return nil, err - } - - icmp4 = icmp - icmp6 = icmp - } - - tcp, err := tcp.NewTCP(&protos.Protos) - if err != nil { - return nil, err - } - - udp, err := udp.NewUDP(&protos.Protos) - if err != nil { - return nil, err } - - worker, err := decoder.New(pb.flows, dl, icmp4, icmp6, tcp, udp) - if err != nil { - return nil, err - } - - return worker, nil } -func (pb *packetbeat) icmpConfig() (*common.Config, error) { - var icmp *common.Config - if pb.config.Protocols["icmp"].Enabled() { - icmp = pb.config.Protocols["icmp"] - } - - for _, cfg := range pb.config.ProtocolsList { - info := struct { - Type string `config:"type" validate:"required"` - }{} - - if err := cfg.Unpack(&info); err != nil { - return nil, err - } - - if info.Type != "icmp" { - continue - } - - if icmp != nil { - return nil, errors.New("More then one icmp configurations found") - } - - icmp = cfg - } - - return icmp, nil +// Called by the Beat stop function +func (pb *packetbeat) Stop() { + logp.Info("Packetbeat send stop signal") + close(pb.done) } diff --git a/packetbeat/beater/processor.go b/packetbeat/beater/processor.go new file mode 100644 index 00000000000..dc9dde1cb5f --- /dev/null +++ b/packetbeat/beater/processor.go @@ -0,0 +1,142 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "fmt" + "sync" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/publisher/pipeline" + + "github.com/elastic/beats/v7/packetbeat/flows" + "github.com/elastic/beats/v7/packetbeat/procs" + "github.com/elastic/beats/v7/packetbeat/protos" + "github.com/elastic/beats/v7/packetbeat/publish" + "github.com/elastic/beats/v7/packetbeat/sniffer" +) + +type processor struct { + wg sync.WaitGroup + flows *flows.Flows + sniffer *sniffer.Sniffer + err chan error +} + +func newProcessor(flows *flows.Flows, sniffer *sniffer.Sniffer, err chan error) *processor { + return &processor{ + flows: flows, + sniffer: sniffer, + err: err, + } +} + +func (p *processor) String() string { + return "packetbeat.processor" +} + +func (p *processor) Start() { + if p.flows != nil { + p.flows.Start() + } + p.wg.Add(1) + go func() { + defer p.wg.Done() + + err := p.sniffer.Run() + if err != nil { + p.err <- fmt.Errorf("Sniffer loop failed: %v", err) + } + p.err <- nil + }() +} + +func (p *processor) Stop() { + p.sniffer.Stop() + if p.flows != nil { + p.flows.Stop() + } + p.wg.Wait() +} + +type processorFactory struct { + name string + err chan error + publisher *publish.TransactionPublisher +} + +func newProcessorFactory(name string, err chan error, publisher *publish.TransactionPublisher) *processorFactory { + return &processorFactory{ + name: name, + err: err, + publisher: publisher, + } +} + +func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *common.Config) (cfgfile.Runner, error) { + config := initialConfig() + err := cfg.Unpack(&config) + if err != nil { + logp.Err("fails to read the beat config: %v, %v", err, config) + return nil, err + } + + // normalize agent-based configuration + config, err = config.Normalize() + if err != nil { + logp.Err("failed to normalize the beat config: %v, %v", err, config) + return nil, err + } + + watcher := procs.ProcessesWatcher{} + // Enable the process watcher only if capturing live traffic + if config.Interfaces.File == "" { + err = watcher.Init(config.Procs) + if err != nil { + logp.Critical(err.Error()) + return nil, err + } + } else { + logp.Info("Process watcher disabled when file input is used") + } + + logp.Debug("main", "Initializing protocol plugins") + protocols := protos.NewProtocols() + err = protocols.Init(false, p.publisher, watcher, config.Protocols, config.ProtocolsList) + if err != nil { + return nil, fmt.Errorf("Initializing protocol analyzers failed: %v", err) + } + flows, err := setupFlows(pipeline, watcher, config) + if err != nil { + return nil, err + } + sniffer, err := setupSniffer(config, protocols, workerFactory(p.publisher, protocols, watcher, flows, config)) + if err != nil { + return nil, err + } + + return newProcessor(flows, sniffer, p.err), nil +} + +func (p *processorFactory) CheckConfig(config *common.Config) error { + _, err := p.Create(pipeline.NewNilPipeline(), config) + return err +} diff --git a/packetbeat/beater/reloader.go b/packetbeat/beater/reloader.go new file mode 100644 index 00000000000..6cbec3c6d1d --- /dev/null +++ b/packetbeat/beater/reloader.go @@ -0,0 +1,100 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "sync" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/publisher/pipetool" +) + +type reloader struct { + mutex sync.Mutex + factory cfgfile.RunnerFactory + runner cfgfile.Runner + configHash uint64 + pipeline beat.PipelineConnector + logger *logp.Logger +} + +func newReloader(name string, factory cfgfile.RunnerFactory, pipeline beat.PipelineConnector) *reloader { + return &reloader{ + factory: factory, + logger: logp.NewLogger(name), + } +} + +func (r *reloader) Stop() { + r.mutex.Lock() + defer r.mutex.Unlock() + + if r.runner != nil { + r.runner.Stop() + } +} + +func (r *reloader) Reload(config *reload.ConfigWithMeta) error { + r.mutex.Lock() + defer r.mutex.Unlock() + + r.logger.Debug("Starting reload procedure") + + hash, err := cfgfile.HashConfig(config.Config) + if err != nil { + r.logger.Errorf("Unable to hash given config: %s", err) + return errors.Wrap(err, "Unable to hash given config") + } + + if hash == r.configHash { + // we have the same config reloaded + return nil + } + // reinitialize config hash + r.configHash = 0 + + if r.runner != nil { + go r.runner.Stop() + } + // reinitialize runner + r.runner = nil + + c, err := common.NewConfigFrom(config.Config) + if err != nil { + r.logger.Errorf("Unable to create new configuration for factory: %s", err) + return errors.Wrap(err, "Unable to create new configuration for factory") + } + runner, err := r.factory.Create(pipetool.WithDynamicFields(r.pipeline, config.Meta), c) + if err != nil { + r.logger.Errorf("Unable to create new runner: %s", err) + return errors.Wrap(err, "Unable to create new runner") + } + + r.logger.Debugf("Starting runner: %s", runner) + r.configHash = hash + r.runner = runner + runner.Start() + + return nil +} diff --git a/packetbeat/beater/setup.go b/packetbeat/beater/setup.go new file mode 100644 index 00000000000..f8ed8b0aea6 --- /dev/null +++ b/packetbeat/beater/setup.go @@ -0,0 +1,72 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/packetbeat/config" + "github.com/elastic/beats/v7/packetbeat/flows" + "github.com/elastic/beats/v7/packetbeat/procs" + "github.com/elastic/beats/v7/packetbeat/protos" + "github.com/elastic/beats/v7/packetbeat/sniffer" +) + +func setupSniffer(cfg config.Config, protocols *protos.ProtocolsStruct, workerFactory sniffer.WorkerFactory) (*sniffer.Sniffer, error) { + icmp, err := cfg.ICMP() + if err != nil { + return nil, err + } + + filter := cfg.Interfaces.BpfFilter + if filter == "" && !cfg.Flows.IsEnabled() { + filter = protocols.BpfFilter(cfg.Interfaces.WithVlans, icmp.Enabled()) + } + + return sniffer.New(false, filter, workerFactory, cfg.Interfaces) +} + +func setupFlows(pipeline beat.Pipeline, watcher procs.ProcessesWatcher, cfg config.Config) (*flows.Flows, error) { + if !cfg.Flows.IsEnabled() { + return nil, nil + } + + processors, err := processors.New(cfg.Flows.Processors) + if err != nil { + return nil, err + } + + clientConfig := beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + EventMetadata: cfg.Flows.EventMetadata, + Processor: processors, + KeepNull: cfg.Flows.KeepNull, + }, + } + if cfg.Flows.Index != "" { + clientConfig.Processing.Meta = common.MapStr{"index": cfg.Flows.Index} + } + + client, err := pipeline.ConnectWith(clientConfig) + if err != nil { + return nil, err + } + + return flows.NewFlows(client.PublishAll, watcher, cfg.Flows) +} diff --git a/packetbeat/beater/worker.go b/packetbeat/beater/worker.go new file mode 100644 index 00000000000..5dd6a514454 --- /dev/null +++ b/packetbeat/beater/worker.go @@ -0,0 +1,75 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "github.com/tsg/gopacket/layers" + + "github.com/elastic/beats/v7/packetbeat/config" + "github.com/elastic/beats/v7/packetbeat/decoder" + "github.com/elastic/beats/v7/packetbeat/flows" + "github.com/elastic/beats/v7/packetbeat/procs" + "github.com/elastic/beats/v7/packetbeat/protos" + "github.com/elastic/beats/v7/packetbeat/protos/icmp" + "github.com/elastic/beats/v7/packetbeat/protos/tcp" + "github.com/elastic/beats/v7/packetbeat/protos/udp" + "github.com/elastic/beats/v7/packetbeat/publish" + "github.com/elastic/beats/v7/packetbeat/sniffer" +) + +func workerFactory(publisher *publish.TransactionPublisher, protocols *protos.ProtocolsStruct, watcher procs.ProcessesWatcher, flows *flows.Flows, cfg config.Config) func(dl layers.LinkType) (sniffer.Worker, error) { + return func(dl layers.LinkType) (sniffer.Worker, error) { + var icmp4 icmp.ICMPv4Processor + var icmp6 icmp.ICMPv6Processor + config, err := cfg.ICMP() + if err != nil { + return nil, err + } + if config.Enabled() { + reporter, err := publisher.CreateReporter(config) + if err != nil { + return nil, err + } + + icmp, err := icmp.New(false, reporter, watcher, config) + if err != nil { + return nil, err + } + + icmp4 = icmp + icmp6 = icmp + } + + tcp, err := tcp.NewTCP(protocols) + if err != nil { + return nil, err + } + + udp, err := udp.NewUDP(protocols) + if err != nil { + return nil, err + } + + worker, err := decoder.New(flows, dl, icmp4, icmp6, tcp, udp) + if err != nil { + return nil, err + } + + return worker, nil + } +} diff --git a/packetbeat/config/agent.go b/packetbeat/config/agent.go new file mode 100644 index 00000000000..3c5c6b70c6f --- /dev/null +++ b/packetbeat/config/agent.go @@ -0,0 +1,80 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package config + +import ( + "fmt" + "runtime" + "strings" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +var osDefaultDevices = map[string]string{ + "darwin": "en0", + "linux": "any", +} + +func defaultDevice() string { + if device, found := osDefaultDevices[runtime.GOOS]; found { + return device + } + return "0" +} + +// Normalize allows the packetbeat configuration to understand +// agent semantics +func (c Config) Normalize() (Config, error) { + logp.Debug("agent", "Normalizing agent configuration") + if len(c.Inputs) > 0 { + // override everything, we're managed by agent + c.Flows = nil + c.Protocols = nil + c.ProtocolsList = []*common.Config{} + // TODO: make this configurable rather than just using the default device in + // managed mode + c.Interfaces.Device = defaultDevice() + } + + for _, input := range c.Inputs { + if rawInputType, ok := input["type"]; ok { + inputType, ok := rawInputType.(string) + if ok && strings.HasPrefix(inputType, "network/") { + config, err := common.NewConfigFrom(input) + if err != nil { + return c, err + } + protocol := strings.TrimPrefix(inputType, "network/") + logp.Debug("agent", fmt.Sprintf("Found agent configuration for %v", protocol)) + switch protocol { + case "flows": + if err := config.Unpack(&c.Flows); err != nil { + return c, err + } + default: + if err = config.SetString("type", -1, protocol); err != nil { + return c, err + } + c.ProtocolsList = append(c.ProtocolsList, config) + } + } + } + } + return c, nil +} diff --git a/packetbeat/config/agent_test.go b/packetbeat/config/agent_test.go new file mode 100644 index 00000000000..eb82bb90f21 --- /dev/null +++ b/packetbeat/config/agent_test.go @@ -0,0 +1,49 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package config + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestAgentInputNormalization(t *testing.T) { + cfg, err := common.NewConfigFrom(` +inputs: +- type: network/flows + timeout: 10s + period: 10s + keep_null: false + data_stream.namespace: default +- type: network/amqp + ports: [5672] + data_stream.namespace: default +`) + require.NoError(t, err) + config := Config{} + require.NoError(t, cfg.Unpack(&config)) + + config, err = config.Normalize() + require.NoError(t, err) + + require.Equal(t, config.Flows.Timeout, "10s") + require.Len(t, config.ProtocolsList, 1) +} diff --git a/packetbeat/config/config.go b/packetbeat/config/config.go index 893e4828ab8..1d344787055 100644 --- a/packetbeat/config/config.go +++ b/packetbeat/config/config.go @@ -18,6 +18,7 @@ package config import ( + "errors" "time" "github.com/elastic/beats/v7/libbeat/common" @@ -33,6 +34,38 @@ type Config struct { Procs procs.ProcsConfig `config:"procs"` IgnoreOutgoing bool `config:"ignore_outgoing"` ShutdownTimeout time.Duration `config:"shutdown_timeout"` + + // agent configuration + Inputs []map[string]interface{} `config:"inputs"` +} + +// ICMP returns the ICMP configuration +func (c Config) ICMP() (*common.Config, error) { + var icmp *common.Config + if c.Protocols["icmp"].Enabled() { + icmp = c.Protocols["icmp"] + } + + for _, cfg := range c.ProtocolsList { + info := struct { + Type string `config:"type" validate:"required"` + }{} + + if err := cfg.Unpack(&info); err != nil { + return nil, err + } + + if info.Type != "icmp" { + continue + } + + if icmp != nil { + return nil, errors.New("More then one icmp configurations found") + } + + icmp = cfg + } + return icmp, nil } type InterfacesConfig struct { @@ -57,6 +90,8 @@ type Flows struct { EventMetadata common.EventMetadata `config:",inline"` Processors processors.PluginConfig `config:"processors"` KeepNull bool `config:"keep_null"` + // Index is used to overwrite the index where flows are published + Index string `config:"index"` } type ProtocolCommon struct { diff --git a/packetbeat/flows/flows.go b/packetbeat/flows/flows.go index fb292b92bf2..d58a2c45987 100644 --- a/packetbeat/flows/flows.go +++ b/packetbeat/flows/flows.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/packetbeat/config" + "github.com/elastic/beats/v7/packetbeat/procs" ) type Flows struct { @@ -41,7 +42,7 @@ const ( defaultPeriod = 10 * time.Second ) -func NewFlows(pub Reporter, config *config.Flows) (*Flows, error) { +func NewFlows(pub Reporter, watcher procs.ProcessesWatcher, config *config.Flows) (*Flows, error) { duration := func(s string, d time.Duration) (time.Duration, error) { if s == "" { return d, nil @@ -67,7 +68,7 @@ func NewFlows(pub Reporter, config *config.Flows) (*Flows, error) { counter := &counterReg{} - worker, err := newFlowsWorker(pub, table, counter, timeout, period) + worker, err := newFlowsWorker(pub, watcher, table, counter, timeout, period) if err != nil { logp.Err("failed to configure flows processing intervals: %v", err) return nil, err diff --git a/packetbeat/flows/flows_test.go b/packetbeat/flows/flows_test.go index b1c3a5b83b9..e56b3777374 100644 --- a/packetbeat/flows/flows_test.go +++ b/packetbeat/flows/flows_test.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/packetbeat/config" + "github.com/elastic/beats/v7/packetbeat/procs" ) type flowsChan struct { @@ -50,7 +51,7 @@ func TestFlowsCounting(t *testing.T) { port1 := []byte{0, 1} port2 := []byte{0, 2} - module, err := NewFlows(nil, &config.Flows{}) + module, err := NewFlows(nil, procs.ProcessesWatcher{}, &config.Flows{}) assert.NoError(t, err) uint1, err := module.NewUint("uint1") diff --git a/packetbeat/flows/worker.go b/packetbeat/flows/worker.go index 56445801781..2a9ca482ed3 100644 --- a/packetbeat/flows/worker.go +++ b/packetbeat/flows/worker.go @@ -32,6 +32,7 @@ import ( type flowsProcessor struct { spool spool + watcher procs.ProcessesWatcher table *flowMetaTable counters *counterReg timeout time.Duration @@ -44,6 +45,7 @@ var ( func newFlowsWorker( pub Reporter, + watcher procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period time.Duration, @@ -84,6 +86,7 @@ func newFlowsWorker( defaultBatchSize := 1024 processor := &flowsProcessor{ table: table, + watcher: watcher, counters: counters, timeout: timeout, } @@ -194,13 +197,14 @@ func (fw *flowsProcessor) report( isOver bool, intNames, uintNames, floatNames []string, ) { - event := createEvent(ts, flow, isOver, intNames, uintNames, floatNames) + event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames) debugf("add event: %v", event) fw.spool.publish(event) } func createEvent( + watcher procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string, @@ -386,7 +390,7 @@ func createEvent( // Set process information if it's available if tuple.IPLength != 0 && tuple.SrcPort != 0 { - if proc := procs.ProcWatcher.FindProcessesTuple(&tuple, proto); proc != nil { + if proc := watcher.FindProcessesTuple(&tuple, proto); proc != nil { if proc.Src.PID > 0 { p := common.MapStr{ "pid": proc.Src.PID, diff --git a/packetbeat/flows/worker_test.go b/packetbeat/flows/worker_test.go index 15cef57cc25..3bec75f2fe3 100644 --- a/packetbeat/flows/worker_test.go +++ b/packetbeat/flows/worker_test.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" ) var ( @@ -66,7 +67,7 @@ func TestCreateEvent(t *testing.T) { } bif.stats[0] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{10, 1}} bif.stats[1] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{460, 2}} - event := createEvent(time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil) + event := createEvent(procs.ProcessesWatcher{}, time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil) // Validate the contents of the event. validate := lookslike.MustCompile(map[string]interface{}{ diff --git a/packetbeat/procs/procs.go b/packetbeat/procs/procs.go index dfead47d93c..bf3daab9ff2 100644 --- a/packetbeat/procs/procs.go +++ b/packetbeat/procs/procs.go @@ -83,8 +83,6 @@ type ProcessesWatcher struct { impl processWatcherImpl } -var ProcWatcher ProcessesWatcher - func (proc *ProcessesWatcher) Init(config ProcsConfig) error { return proc.initWithImpl(config, proc) } diff --git a/packetbeat/protos/amqp/amqp.go b/packetbeat/protos/amqp/amqp.go index c361c3e7fe6..1113d4ee6df 100644 --- a/packetbeat/protos/amqp/amqp.go +++ b/packetbeat/protos/amqp/amqp.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/packetbeat/pb" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" ) @@ -47,6 +48,7 @@ type amqpPlugin struct { transactions *common.Cache transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher //map containing functions associated with different method numbers methodMap map[codeClass]map[codeMethod]amqpMethod @@ -64,6 +66,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &amqpPlugin{} @@ -74,13 +77,13 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (amqp *amqpPlugin) init(results protos.Reporter, config *amqpConfig) error { +func (amqp *amqpPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *amqpConfig) error { amqp.initMethodMap() amqp.setFromConfig(config) @@ -92,6 +95,7 @@ func (amqp *amqpPlugin) init(results protos.Reporter, config *amqpConfig) error protos.DefaultTransactionHashSize) amqp.transactions.StartJanitor(amqp.transactionTimeout) amqp.results = results + amqp.watcher = watcher return nil } diff --git a/packetbeat/protos/amqp/amqp_parser.go b/packetbeat/protos/amqp/amqp_parser.go index eeaaf2a9464..6ab15ec8159 100644 --- a/packetbeat/protos/amqp/amqp_parser.go +++ b/packetbeat/protos/amqp/amqp_parser.go @@ -23,7 +23,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/packetbeat/procs" ) func (amqp *amqpPlugin) amqpMessageParser(s *amqpStream) (ok bool, complete bool) { @@ -336,7 +335,7 @@ func (amqp *amqpPlugin) handleAmqp(m *amqpMessage, tcptuple *common.TCPTuple, di debugf("A message is ready to be handled") m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = amqp.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.method == "basic.publish" { amqp.handlePublishing(m) diff --git a/packetbeat/protos/amqp/amqp_test.go b/packetbeat/protos/amqp/amqp_test.go index 37be71c571d..19002b60941 100644 --- a/packetbeat/protos/amqp/amqp_test.go +++ b/packetbeat/protos/amqp/amqp_test.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -45,7 +46,7 @@ func amqpModForTests() (*eventStore, *amqpPlugin) { var amqp amqpPlugin results := &eventStore{} config := defaultConfig - amqp.init(results.publish, &config) + amqp.init(results.publish, procs.ProcessesWatcher{}, &config) return results, &amqp } diff --git a/packetbeat/protos/cassandra/cassandra.go b/packetbeat/protos/cassandra/cassandra.go index ed0f48e91a4..7d3001a5159 100644 --- a/packetbeat/protos/cassandra/cassandra.go +++ b/packetbeat/protos/cassandra/cassandra.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" @@ -34,6 +35,7 @@ type cassandra struct { ports protos.PortsConfig parserConfig parserConfig transConfig transactionConfig + watcher procs.ProcessesWatcher pub transPub } @@ -60,6 +62,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &cassandra{} @@ -70,17 +73,18 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (cassandra *cassandra) init(results protos.Reporter, config *cassandraConfig) error { +func (cassandra *cassandra) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *cassandraConfig) error { if err := cassandra.setFromConfig(config); err != nil { return err } cassandra.pub.results = results + cassandra.watcher = watcher return nil } @@ -193,7 +197,7 @@ func (cassandra *cassandra) ensureConnection(private protos.ProtocolData) *conne conn := getConnection(private) if conn == nil { conn = &connection{} - conn.trans.init(&cassandra.transConfig, cassandra.pub.onTransaction) + conn.trans.init(&cassandra.transConfig, cassandra.watcher, cassandra.pub.onTransaction) } return conn } diff --git a/packetbeat/protos/cassandra/trans.go b/packetbeat/protos/cassandra/trans.go index 62d36ee3695..9b055d22c88 100644 --- a/packetbeat/protos/cassandra/trans.go +++ b/packetbeat/protos/cassandra/trans.go @@ -33,6 +33,8 @@ type transactions struct { responses messageList onTransaction transactionHandler + + watcher procs.ProcessesWatcher } type transactionConfig struct { @@ -46,8 +48,9 @@ type messageList struct { head, tail *message } -func (trans *transactions) init(c *transactionConfig, cb transactionHandler) { +func (trans *transactions) init(c *transactionConfig, watcher procs.ProcessesWatcher, cb transactionHandler) { trans.config = c + trans.watcher = watcher trans.onTransaction = cb } @@ -59,7 +62,7 @@ func (trans *transactions) onMessage( var err error msg.Tuple = *tuple msg.Transport = applayer.TransportTCP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(&msg.Tuple) + msg.CmdlineTuple = trans.watcher.FindProcessesTupleTCP(&msg.Tuple) if msg.IsRequest { if isDebug { diff --git a/packetbeat/protos/dhcpv4/dhcpv4.go b/packetbeat/protos/dhcpv4/dhcpv4.go index 10d299aea76..323f46f2a54 100644 --- a/packetbeat/protos/dhcpv4/dhcpv4.go +++ b/packetbeat/protos/dhcpv4/dhcpv4.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/packetbeat/pb" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/ecs/code/go/ecs" ) @@ -45,12 +46,13 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { - return newPlugin(testMode, results, cfg) + return newPlugin(testMode, results, watcher, cfg) } -func newPlugin(testMode bool, results protos.Reporter, cfg *common.Config) (*dhcpv4Plugin, error) { +func newPlugin(testMode bool, results protos.Reporter, watcher procs.ProcessesWatcher, cfg *common.Config) (*dhcpv4Plugin, error) { config := defaultConfig if !testMode { @@ -62,14 +64,16 @@ func newPlugin(testMode bool, results protos.Reporter, cfg *common.Config) (*dhc return &dhcpv4Plugin{ dhcpv4Config: config, report: results, + watcher: watcher, log: logp.NewLogger("dhcpv4"), }, nil } type dhcpv4Plugin struct { dhcpv4Config - report protos.Reporter - log *logp.Logger + report protos.Reporter + watcher procs.ProcessesWatcher + log *logp.Logger } func (p *dhcpv4Plugin) GetPorts() []int { diff --git a/packetbeat/protos/dhcpv4/dhcpv4_test.go b/packetbeat/protos/dhcpv4/dhcpv4_test.go index 1f7d416248a..e695ecf00cf 100644 --- a/packetbeat/protos/dhcpv4/dhcpv4_test.go +++ b/packetbeat/protos/dhcpv4/dhcpv4_test.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -81,7 +82,7 @@ var ( func TestParseDHCPRequest(t *testing.T) { logp.TestingSetup() - p, err := newPlugin(true, nil, nil) + p, err := newPlugin(true, nil, procs.ProcessesWatcher{}, nil) if err != nil { t.Fatal(err) } @@ -165,7 +166,7 @@ func TestParseDHCPRequest(t *testing.T) { } func TestParseDHCPACK(t *testing.T) { - p, err := newPlugin(true, nil, nil) + p, err := newPlugin(true, nil, procs.ProcessesWatcher{}, nil) if err != nil { t.Fatal(err) } diff --git a/packetbeat/protos/dns/dns.go b/packetbeat/protos/dns/dns.go index 8fbf402b6b4..15aa154276f 100644 --- a/packetbeat/protos/dns/dns.go +++ b/packetbeat/protos/dns/dns.go @@ -38,6 +38,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/packetbeat/pb" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) @@ -55,6 +56,7 @@ type dnsPlugin struct { transactionTimeout time.Duration results protos.Reporter // Channel where results are pushed. + watcher procs.ProcessesWatcher } var ( @@ -220,6 +222,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &dnsPlugin{} @@ -230,13 +233,13 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (dns *dnsPlugin) init(results protos.Reporter, config *dnsConfig) error { +func (dns *dnsPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *dnsConfig) error { dns.setFromConfig(config) dns.transactions = common.NewCacheWithRemovalListener( dns.transactionTimeout, @@ -252,6 +255,7 @@ func (dns *dnsPlugin) init(results protos.Reporter, config *dnsConfig) error { dns.transactions.StartJanitor(dns.transactionTimeout) dns.results = results + dns.watcher = watcher return nil } diff --git a/packetbeat/protos/dns/dns_tcp.go b/packetbeat/protos/dns/dns_tcp.go index 310cf43553e..bbf7e736926 100644 --- a/packetbeat/protos/dns/dns_tcp.go +++ b/packetbeat/protos/dns/dns_tcp.go @@ -23,7 +23,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" @@ -150,7 +149,7 @@ func (dns *dnsPlugin) handleDNS(conn *dnsConnectionData, tcpTuple *common.TCPTup message := conn.data[dir].message dnsTuple := dnsTupleFromIPPort(&message.tuple, transportTCP, decodedData.Id) - message.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcpTuple.IPPort()) + message.cmdlineTuple = dns.watcher.FindProcessesTupleTCP(tcpTuple.IPPort()) message.data = decodedData message.length += decodeOffset diff --git a/packetbeat/protos/dns/dns_test.go b/packetbeat/protos/dns/dns_test.go index c5ee52eb5eb..17303783fca 100644 --- a/packetbeat/protos/dns/dns_test.go +++ b/packetbeat/protos/dns/dns_test.go @@ -34,6 +34,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -111,7 +112,7 @@ func newDNS(store *eventStore, verbose bool) *dnsPlugin { "send_request": true, "send_response": true, }) - dns, err := New(false, callback, cfg) + dns, err := New(false, callback, procs.ProcessesWatcher{}, cfg) if err != nil { panic(err) } diff --git a/packetbeat/protos/dns/dns_udp.go b/packetbeat/protos/dns/dns_udp.go index 652e03bb717..c1a22c7536f 100644 --- a/packetbeat/protos/dns/dns_udp.go +++ b/packetbeat/protos/dns/dns_udp.go @@ -20,7 +20,6 @@ package dns import ( "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) @@ -47,7 +46,7 @@ func (dns *dnsPlugin) ParseUDP(pkt *protos.Packet) { dnsMsg := &dnsMessage{ ts: pkt.Ts, tuple: pkt.Tuple, - cmdlineTuple: procs.ProcWatcher.FindProcessesTupleUDP(&pkt.Tuple), + cmdlineTuple: dns.watcher.FindProcessesTupleUDP(&pkt.Tuple), data: dnsPkt, length: packetSize, } diff --git a/packetbeat/protos/http/http.go b/packetbeat/protos/http/http.go index 4b2367c0239..3dd7484822e 100644 --- a/packetbeat/protos/http/http.go +++ b/packetbeat/protos/http/http.go @@ -97,6 +97,7 @@ type httpPlugin struct { transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher } var ( @@ -111,6 +112,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &httpPlugin{} @@ -121,19 +123,20 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } // Init initializes the HTTP protocol analyser. -func (http *httpPlugin) init(results protos.Reporter, config *httpConfig) error { +func (http *httpPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *httpConfig) error { http.setFromConfig(config) isDebug = logp.IsDebug("http") isDetailed = logp.IsDebug("httpdetailed") http.results = results + http.watcher = watcher return nil } @@ -435,7 +438,7 @@ func (http *httpPlugin) handleHTTP( m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = http.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) http.hideHeaders(m) if m.isRequest { diff --git a/packetbeat/protos/http/http_test.go b/packetbeat/protos/http/http_test.go index 2e2995ff463..53f55214851 100644 --- a/packetbeat/protos/http/http_test.go +++ b/packetbeat/protos/http/http_test.go @@ -33,6 +33,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -88,7 +89,7 @@ func httpModForTests(store *eventStore) *httpPlugin { callback = store.publish } - http, err := New(false, callback, common.NewConfig()) + http, err := New(false, callback, procs.ProcessesWatcher{}, common.NewConfig()) if err != nil { panic(err) } diff --git a/packetbeat/protos/icmp/icmp.go b/packetbeat/protos/icmp/icmp.go index 6fb210fd871..f86dd291886 100644 --- a/packetbeat/protos/icmp/icmp.go +++ b/packetbeat/protos/icmp/icmp.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/packetbeat/flows" "github.com/elastic/beats/v7/packetbeat/pb" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/tsg/gopacket/layers" @@ -45,6 +46,7 @@ type icmpPlugin struct { transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher } type ICMPv4Processor interface { @@ -74,7 +76,7 @@ var ( duplicateRequests = monitoring.NewInt(nil, "icmp.duplicate_requests") ) -func New(testMode bool, results protos.Reporter, cfg *common.Config) (*icmpPlugin, error) { +func New(testMode bool, results protos.Reporter, watcher procs.ProcessesWatcher, cfg *common.Config) (*icmpPlugin, error) { p := &icmpPlugin{} config := defaultConfig if !testMode { @@ -83,13 +85,13 @@ func New(testMode bool, results protos.Reporter, cfg *common.Config) (*icmpPlugi } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (icmp *icmpPlugin) init(results protos.Reporter, config *icmpConfig) error { +func (icmp *icmpPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *icmpConfig) error { icmp.setFromConfig(config) var err error @@ -112,6 +114,7 @@ func (icmp *icmpPlugin) init(results protos.Reporter, config *icmpConfig) error icmp.transactions.StartJanitor(icmp.transactionTimeout) icmp.results = results + icmp.watcher = watcher return nil } diff --git a/packetbeat/protos/icmp/icmp_test.go b/packetbeat/protos/icmp/icmp_test.go index 3ad537fa7d4..fc9508fbcdc 100644 --- a/packetbeat/protos/icmp/icmp_test.go +++ b/packetbeat/protos/icmp/icmp_test.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/tsg/gopacket" @@ -60,7 +61,7 @@ func TestIcmpDirection(t *testing.T) { func BenchmarkIcmpProcessICMPv4(b *testing.B) { logp.TestingSetup(logp.WithSelectors("icmp", "icmpdetailed")) - icmp, err := New(true, func(beat.Event) {}, common.NewConfig()) + icmp, err := New(true, func(beat.Event) {}, procs.ProcessesWatcher{}, common.NewConfig()) if err != nil { b.Error("Failed to create ICMP processor") return diff --git a/packetbeat/protos/memcache/memcache.go b/packetbeat/protos/memcache/memcache.go index e59550287a5..39bfccd255a 100644 --- a/packetbeat/protos/memcache/memcache.go +++ b/packetbeat/protos/memcache/memcache.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/applayer" ) @@ -38,6 +39,7 @@ import ( type memcache struct { ports protos.PortsConfig results protos.Reporter + watcher procs.ProcessesWatcher config parserConfig udpMemcache @@ -131,6 +133,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &memcache{} @@ -141,14 +144,14 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } // Called to initialize the Plugin -func (mc *memcache) init(results protos.Reporter, config *memcacheConfig) error { +func (mc *memcache) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *memcacheConfig) error { debug("init memcache plugin") mc.handler = mc @@ -158,6 +161,7 @@ func (mc *memcache) init(results protos.Reporter, config *memcacheConfig) error mc.udpConnections = make(map[common.HashableIPPortTuple]*udpConnection) mc.results = results + mc.watcher = watcher return nil } diff --git a/packetbeat/protos/memcache/memcache_test.go b/packetbeat/protos/memcache/memcache_test.go index b36483770c1..641dec070e1 100644 --- a/packetbeat/protos/memcache/memcache_test.go +++ b/packetbeat/protos/memcache/memcache_test.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/packetbeat/procs" ) type memcacheTest struct { @@ -36,7 +37,7 @@ type memcacheTest struct { func newMemcacheTest(config memcacheConfig) *memcacheTest { mct := &memcacheTest{} mc := &memcache{} - mc.init(nil, &config) + mc.init(nil, procs.ProcessesWatcher{}, &config) mc.handler = mct mct.mc = mc return mct diff --git a/packetbeat/protos/memcache/plugin_tcp.go b/packetbeat/protos/memcache/plugin_tcp.go index e9dded17dd6..830a0cd64a5 100644 --- a/packetbeat/protos/memcache/plugin_tcp.go +++ b/packetbeat/protos/memcache/plugin_tcp.go @@ -25,7 +25,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/applayer" "github.com/elastic/beats/v7/packetbeat/protos/tcp" @@ -191,7 +190,7 @@ func (mc *memcache) onTCPMessage( ) error { msg.Tuple = *tuple msg.Transport = applayer.TransportTCP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tuple) + msg.CmdlineTuple = mc.watcher.FindProcessesTupleTCP(tuple) if msg.IsRequest { return mc.onTCPRequest(conn, tuple, dir, msg) diff --git a/packetbeat/protos/memcache/plugin_udp.go b/packetbeat/protos/memcache/plugin_udp.go index 850c6e421fb..441b286a49e 100644 --- a/packetbeat/protos/memcache/plugin_udp.go +++ b/packetbeat/protos/memcache/plugin_udp.go @@ -27,7 +27,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common/streambuf" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/applayer" ) @@ -184,7 +183,7 @@ func (mc *memcache) onUDPMessage( } msg.Tuple = *tuple msg.Transport = applayer.TransportUDP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTupleUDP(tuple) + msg.CmdlineTuple = mc.watcher.FindProcessesTupleUDP(tuple) done := false var err error diff --git a/packetbeat/protos/mongodb/mongodb.go b/packetbeat/protos/mongodb/mongodb.go index ac3e66dca5e..28a9350840e 100644 --- a/packetbeat/protos/mongodb/mongodb.go +++ b/packetbeat/protos/mongodb/mongodb.go @@ -47,6 +47,7 @@ type mongodbPlugin struct { transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher } type transactionKey struct { @@ -65,6 +66,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &mongodbPlugin{} @@ -75,13 +77,13 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (mongodb *mongodbPlugin) init(results protos.Reporter, config *mongodbConfig) error { +func (mongodb *mongodbPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *mongodbConfig) error { debugf("Init a MongoDB protocol parser") mongodb.setFromConfig(config) @@ -94,6 +96,7 @@ func (mongodb *mongodbPlugin) init(results protos.Reporter, config *mongodbConfi protos.DefaultTransactionHashSize) mongodb.responses.StartJanitor(mongodb.transactionTimeout) mongodb.results = results + mongodb.watcher = watcher return nil } @@ -218,7 +221,7 @@ func (mongodb *mongodbPlugin) handleMongodb( m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = mongodb.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.isResponse { debugf("MongoDB response message") diff --git a/packetbeat/protos/mongodb/mongodb_test.go b/packetbeat/protos/mongodb/mongodb_test.go index 639a2ee7e78..2debae92dff 100644 --- a/packetbeat/protos/mongodb/mongodb_test.go +++ b/packetbeat/protos/mongodb/mongodb_test.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) @@ -46,7 +47,7 @@ func mongodbModForTests() (*eventStore, *mongodbPlugin) { var mongodb mongodbPlugin results := &eventStore{} config := defaultConfig - mongodb.init(results.publish, &config) + mongodb.init(results.publish, procs.ProcessesWatcher{}, &config) return results, &mongodb } diff --git a/packetbeat/protos/mysql/mysql.go b/packetbeat/protos/mysql/mysql.go index 4d08debf976..506b6c30ca8 100644 --- a/packetbeat/protos/mysql/mysql.go +++ b/packetbeat/protos/mysql/mysql.go @@ -158,6 +158,7 @@ type mysqlPlugin struct { prepareStatementTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher // function pointer for mocking handleMysql func(mysql *mysqlPlugin, m *mysqlMessage, tcp *common.TCPTuple, @@ -171,6 +172,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &mysqlPlugin{} @@ -181,13 +183,13 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (mysql *mysqlPlugin) init(results protos.Reporter, config *mysqlConfig) error { +func (mysql *mysqlPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *mysqlConfig) error { mysql.setFromConfig(config) mysql.transactions = common.NewCache( @@ -203,6 +205,7 @@ func (mysql *mysqlPlugin) init(results protos.Reporter, config *mysqlConfig) err mysql.handleMysql = handleMysql mysql.results = results + mysql.watcher = watcher return nil } @@ -651,7 +654,7 @@ func handleMysql(mysql *mysqlPlugin, m *mysqlMessage, tcptuple *common.TCPTuple, m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = mysql.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) m.raw = rawMsg if m.isRequest { diff --git a/packetbeat/protos/mysql/mysql_test.go b/packetbeat/protos/mysql/mysql_test.go index d55917114ee..4e9123c5617 100644 --- a/packetbeat/protos/mysql/mysql_test.go +++ b/packetbeat/protos/mysql/mysql_test.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" "github.com/elastic/beats/v7/packetbeat/publish" @@ -60,7 +61,7 @@ func mysqlModForTests(store *eventStore) *mysqlPlugin { var mysql mysqlPlugin config := defaultConfig config.Ports = []int{serverPort} - mysql.init(callback, &config) + mysql.init(callback, procs.ProcessesWatcher{}, &config) return &mysql } diff --git a/packetbeat/protos/nfs/rpc.go b/packetbeat/protos/nfs/rpc.go index f115cf19fba..9cde7ab5aac 100644 --- a/packetbeat/protos/nfs/rpc.go +++ b/packetbeat/protos/nfs/rpc.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" ) @@ -70,6 +71,7 @@ func init() { func New( testMode bool, results protos.Reporter, + _ procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &rpc{} diff --git a/packetbeat/protos/pgsql/pgsql.go b/packetbeat/protos/pgsql/pgsql.go index 69bfb468887..5ad6f6e305a 100644 --- a/packetbeat/protos/pgsql/pgsql.go +++ b/packetbeat/protos/pgsql/pgsql.go @@ -49,6 +49,7 @@ type pgsqlPlugin struct { transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher // function pointer for mocking handlePgsql func(pgsql *pgsqlPlugin, m *pgsqlMessage, tcp *common.TCPTuple, @@ -140,6 +141,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &pgsqlPlugin{} @@ -150,13 +152,13 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (pgsql *pgsqlPlugin) init(results protos.Reporter, config *pgsqlConfig) error { +func (pgsql *pgsqlPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *pgsqlConfig) error { pgsql.setFromConfig(config) pgsql.log = logp.NewLogger("pgsql") @@ -170,6 +172,7 @@ func (pgsql *pgsqlPlugin) init(results protos.Reporter, config *pgsqlConfig) err pgsql.transactions.StartJanitor(pgsql.transactionTimeout) pgsql.handlePgsql = handlePgsql pgsql.results = results + pgsql.watcher = watcher return nil } @@ -379,7 +382,7 @@ var handlePgsql = func(pgsql *pgsqlPlugin, m *pgsqlMessage, tcptuple *common.TCP m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = pgsql.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.isRequest { pgsql.receivedPgsqlRequest(m) diff --git a/packetbeat/protos/pgsql/pgsql_test.go b/packetbeat/protos/pgsql/pgsql_test.go index db735c64a5d..737f2905a91 100644 --- a/packetbeat/protos/pgsql/pgsql_test.go +++ b/packetbeat/protos/pgsql/pgsql_test.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -56,7 +57,7 @@ func pgsqlModForTests(store *eventStore) *pgsqlPlugin { var pgsql pgsqlPlugin config := defaultConfig - pgsql.init(callback, &config) + pgsql.init(callback, procs.ProcessesWatcher{}, &config) return &pgsql } diff --git a/packetbeat/protos/protos.go b/packetbeat/protos/protos.go index 9991458eb2b..e0343a0ee87 100644 --- a/packetbeat/protos/protos.go +++ b/packetbeat/protos/protos.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" ) const ( @@ -92,11 +93,12 @@ type ProtocolsStruct struct { udp map[Protocol]UDPPlugin } -// Singleton of Protocols type. -var Protos = ProtocolsStruct{ - all: map[Protocol]protocolInstance{}, - tcp: map[Protocol]TCPPlugin{}, - udp: map[Protocol]UDPPlugin{}, +func NewProtocols() *ProtocolsStruct { + return &ProtocolsStruct{ + all: map[Protocol]protocolInstance{}, + tcp: map[Protocol]TCPPlugin{}, + udp: map[Protocol]UDPPlugin{}, + } } type protocolInstance struct { @@ -111,6 +113,7 @@ type reporterFactory interface { func (s ProtocolsStruct) Init( testMode bool, pub reporterFactory, + watcher procs.ProcessesWatcher, configs map[string]*common.Config, listConfigs []*common.Config, ) error { @@ -123,7 +126,7 @@ func (s ProtocolsStruct) Init( } for name, config := range configs { - if err := s.configureProtocol(testMode, pub, name, config); err != nil { + if err := s.configureProtocol(testMode, pub, watcher, name, config); err != nil { return err } } @@ -136,7 +139,7 @@ func (s ProtocolsStruct) Init( return err } - if err := s.configureProtocol(testMode, pub, module.Name, config); err != nil { + if err := s.configureProtocol(testMode, pub, watcher, module.Name, config); err != nil { return err } } @@ -147,6 +150,7 @@ func (s ProtocolsStruct) Init( func (s ProtocolsStruct) configureProtocol( testMode bool, pub reporterFactory, + watcher procs.ProcessesWatcher, name string, config *common.Config, ) error { @@ -182,7 +186,7 @@ func (s ProtocolsStruct) configureProtocol( } } - inst, err := plugin(testMode, results, config) + inst, err := plugin(testMode, results, watcher, config) if err != nil { logp.Err("Failed to register protocol plugin: %v", err) return err diff --git a/packetbeat/protos/redis/redis.go b/packetbeat/protos/redis/redis.go index bf23e94836f..23dd1ad8696 100644 --- a/packetbeat/protos/redis/redis.go +++ b/packetbeat/protos/redis/redis.go @@ -55,6 +55,7 @@ type redisPlugin struct { transactionTimeout time.Duration queueConfig MessageQueueConfig + watcher procs.ProcessesWatcher results protos.Reporter } @@ -75,6 +76,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &redisPlugin{} @@ -85,16 +87,17 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (redis *redisPlugin) init(results protos.Reporter, config *redisConfig) error { +func (redis *redisPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *redisConfig) error { redis.setFromConfig(config) redis.results = results + redis.watcher = watcher isDebug = logp.IsDebug("redis") return nil @@ -247,7 +250,7 @@ func (redis *redisPlugin) handleRedis( ) { m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = redis.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.isRequest { // wait for response diff --git a/packetbeat/protos/registry.go b/packetbeat/protos/registry.go index f1fc17b7074..1d1bd2c7b88 100644 --- a/packetbeat/protos/registry.go +++ b/packetbeat/protos/registry.go @@ -22,11 +22,14 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + + "github.com/elastic/beats/v7/packetbeat/procs" ) type ProtocolPlugin func( testMode bool, results Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (Plugin, error) diff --git a/packetbeat/protos/sip/parser.go b/packetbeat/protos/sip/parser.go index 55e66045e95..7ee5a10bb5b 100644 --- a/packetbeat/protos/sip/parser.go +++ b/packetbeat/protos/sip/parser.go @@ -87,6 +87,7 @@ const ( ) type parser struct { + watcher procs.ProcessesWatcher } type parsingInfo struct { @@ -117,15 +118,17 @@ var ( nameVia = []byte("via") ) -func newParser() *parser { - return &parser{} +func newParser(watcher procs.ProcessesWatcher) *parser { + return &parser{ + watcher: watcher, + } } func (parser *parser) parse(pi *parsingInfo) (*message, error) { m := &message{ ts: pi.pkt.Ts, ipPortTuple: pi.pkt.Tuple, - cmdlineTuple: procs.ProcWatcher.FindProcessesTupleTCP(&pi.pkt.Tuple), + cmdlineTuple: parser.watcher.FindProcessesTupleTCP(&pi.pkt.Tuple), rawData: pi.data, } for pi.parseOffset < len(pi.data) { diff --git a/packetbeat/protos/sip/plugin.go b/packetbeat/protos/sip/plugin.go index 14b56aeda45..0c28b5eb3a4 100644 --- a/packetbeat/protos/sip/plugin.go +++ b/packetbeat/protos/sip/plugin.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/packetbeat/pb" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) @@ -46,6 +47,7 @@ type plugin struct { keepOriginal bool results protos.Reporter + watcher procs.ProcessesWatcher } var ( @@ -60,6 +62,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { cfgwarn.Beta("packetbeat SIP protocol is used") @@ -72,19 +75,20 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } // Init initializes the HTTP protocol analyser. -func (p *plugin) init(results protos.Reporter, config *config) error { +func (p *plugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *config) error { p.setFromConfig(config) isDebug = logp.IsDebug("sip") isDetailed = logp.IsDebug("sipdetailed") p.results = results + p.watcher = watcher return nil } @@ -112,7 +116,7 @@ func (p *plugin) doParse(pkt *protos.Packet) error { detailedf("Payload received: [%s]", pkt.Payload) } - parser := newParser() + parser := newParser(p.watcher) pi := newParsingInfo(pkt) m, err := parser.parse(pi) diff --git a/packetbeat/protos/sip/plugin_test.go b/packetbeat/protos/sip/plugin_test.go index d8c09f5b307..5b09f522aff 100644 --- a/packetbeat/protos/sip/plugin_test.go +++ b/packetbeat/protos/sip/plugin_test.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) @@ -114,7 +115,7 @@ func TestParseUDP(t *testing.T) { gotEvent = &evt } const data = "INVITE sip:test@10.0.2.15:5060 SIP/2.0\r\nVia: SIP/2.0/UDP 10.0.2.20:5060;branch=z9hG4bK-2187-1-0\r\nFrom: \"DVI4/8000\" ;tag=1\r\nTo: test \r\nCall-ID: 1-2187@10.0.2.20\r\nCSeq: 1 INVITE\r\nContact: sip:sipp@10.0.2.20:5060\r\nMax-Forwards: 70\r\nContent-Type: application/sdp\r\nContent-Length: 123\r\n\r\nv=0\r\no=- 42 42 IN IP4 10.0.2.20\r\ns=-\r\nc=IN IP4 10.0.2.20\r\nt=0 0\r\nm=audio 6000 RTP/AVP 5\r\na=rtpmap:5 DVI4/8000\r\na=recvonly\r\n" - p, _ := New(true, reporter, nil) + p, _ := New(true, reporter, procs.ProcessesWatcher{}, nil) plugin := p.(*plugin) plugin.ParseUDP(&protos.Packet{ Ts: time.Now(), diff --git a/packetbeat/protos/tcp/tcp_test.go b/packetbeat/protos/tcp/tcp_test.go index 092d1f6310c..c014e870c6b 100644 --- a/packetbeat/protos/tcp/tcp_test.go +++ b/packetbeat/protos/tcp/tcp_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/stretchr/testify/assert" @@ -44,7 +45,7 @@ var ( ) func init() { - new := func(_ bool, _ protos.Reporter, _ *common.Config) (protos.Plugin, error) { + new := func(_ bool, _ protos.Reporter, _ procs.ProcessesWatcher, _ *common.Config) (protos.Plugin, error) { return &TestProtocol{}, nil } diff --git a/packetbeat/protos/thrift/thrift.go b/packetbeat/protos/thrift/thrift.go index 8c15b9bdf9c..d9778031d76 100644 --- a/packetbeat/protos/thrift/thrift.go +++ b/packetbeat/protos/thrift/thrift.go @@ -57,6 +57,7 @@ type thriftPlugin struct { publishQueue chan *thriftTransaction results protos.Reporter + watcher procs.ProcessesWatcher idl *thriftIdl } @@ -182,6 +183,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &thriftPlugin{} @@ -192,7 +194,7 @@ func New( } } - if err := p.init(testMode, results, &config); err != nil { + if err := p.init(testMode, results, watcher, &config); err != nil { return nil, err } return p, nil @@ -201,6 +203,7 @@ func New( func (thrift *thriftPlugin) init( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, config *thriftConfig, ) error { thrift.InitDefaults() @@ -218,6 +221,7 @@ func (thrift *thriftPlugin) init( if !testMode { thrift.publishQueue = make(chan *thriftTransaction, 1000) thrift.results = results + thrift.watcher = watcher go thrift.publishTransactions() } @@ -894,7 +898,7 @@ func (thrift *thriftPlugin) messageComplete(tcptuple *common.TCPTuple, dir uint8 // all ok, go to next level stream.message.tcpTuple = *tcptuple stream.message.direction = dir - stream.message.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + stream.message.cmdlineTuple = thrift.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) if stream.message.frameSize == 0 { stream.message.frameSize = uint32(stream.parseOffset - stream.message.start) } diff --git a/packetbeat/protos/thrift/thrift_test.go b/packetbeat/protos/thrift/thrift_test.go index 2c6618bab77..e1eca793e42 100644 --- a/packetbeat/protos/thrift/thrift_test.go +++ b/packetbeat/protos/thrift/thrift_test.go @@ -26,13 +26,14 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) func thriftForTests() *thriftPlugin { t := &thriftPlugin{} config := defaultConfig - t.init(true, nil, &config) + t.init(true, nil, procs.ProcessesWatcher{}, &config) return t } diff --git a/packetbeat/protos/tls/tls.go b/packetbeat/protos/tls/tls.go index 74034c4afaf..e91c78d69b8 100644 --- a/packetbeat/protos/tls/tls.go +++ b/packetbeat/protos/tls/tls.go @@ -60,6 +60,7 @@ type tlsPlugin struct { fingerprints []*FingerprintAlgorithm transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher } var ( @@ -78,6 +79,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &tlsPlugin{} @@ -88,18 +90,19 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (plugin *tlsPlugin) init(results protos.Reporter, config *tlsConfig) error { +func (plugin *tlsPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *tlsConfig) error { if err := plugin.setFromConfig(config); err != nil { return err } plugin.results = results + plugin.watcher = watcher isDebug = logp.IsDebug("tls") return nil @@ -178,7 +181,7 @@ func (plugin *tlsPlugin) doParse( st := conn.streams[dir] if st == nil { st = newStream(tcptuple) - st.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + st.cmdlineTuple = plugin.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) conn.streams[dir] = st } diff --git a/packetbeat/protos/tls/tls_test.go b/packetbeat/protos/tls/tls_test.go index 512294f2d4f..90aadb07a95 100644 --- a/packetbeat/protos/tls/tls_test.go +++ b/packetbeat/protos/tls/tls_test.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -66,7 +67,7 @@ func testInit() (*eventStore, *tlsPlugin) { logp.TestingSetup(logp.WithSelectors("tls", "tlsdetailed")) results := &eventStore{} - tls, err := New(true, results.publish, nil) + tls, err := New(true, results.publish, procs.ProcessesWatcher{}, nil) if err != nil { return nil, nil } diff --git a/packetbeat/publish/publish.go b/packetbeat/publish/publish.go index 7890a1c173d..bd904030b59 100644 --- a/packetbeat/publish/publish.go +++ b/packetbeat/publish/publish.go @@ -84,6 +84,7 @@ func (p *TransactionPublisher) CreateReporter( // load and register the module it's fields, tags and processors settings meta := struct { + Index string `config:"index"` Event common.EventMetadata `config:",inline"` Processors processors.PluginConfig `config:"processors"` KeepNull bool `config:"keep_null"` @@ -107,6 +108,9 @@ func (p *TransactionPublisher) CreateReporter( if p.canDrop { clientConfig.PublishMode = beat.DropIfFull } + if meta.Index != "" { + clientConfig.Processing.Meta = common.MapStr{"index": meta.Index} + } client, err := p.pipeline.ConnectWith(clientConfig) if err != nil { diff --git a/packetbeat/scripts/tcp-protocol/{protocol}/trans.go.tmpl b/packetbeat/scripts/tcp-protocol/{protocol}/trans.go.tmpl index 4f7ad362bfe..c25d06f0d65 100644 --- a/packetbeat/scripts/tcp-protocol/{protocol}/trans.go.tmpl +++ b/packetbeat/scripts/tcp-protocol/{protocol}/trans.go.tmpl @@ -16,6 +16,8 @@ type transactions struct { responses messageList onTransaction transactionHandler + + watcher procs.ProcessesWatcher } type transactionConfig struct { @@ -29,8 +31,9 @@ type messageList struct { head, tail *message } -func (trans *transactions) init(c *transactionConfig, cb transactionHandler) { +func (trans *transactions) init(c *transactionConfig, watcher procs.ProcessesWatcher, cb transactionHandler) { trans.config = c + trans.watcher = watcher trans.onTransaction = cb } @@ -43,7 +46,7 @@ func (trans *transactions) onMessage( msg.Tuple = *tuple msg.Transport = applayer.TransportTCP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTuple(&msg.Tuple) + msg.CmdlineTuple = trans.watcher.FindProcessesTuple(&msg.Tuple) if msg.IsRequest { if isDebug { diff --git a/packetbeat/scripts/tcp-protocol/{protocol}/{protocol}.go.tmpl b/packetbeat/scripts/tcp-protocol/{protocol}/{protocol}.go.tmpl index f783e840301..8af08842fd4 100644 --- a/packetbeat/scripts/tcp-protocol/{protocol}/{protocol}.go.tmpl +++ b/packetbeat/scripts/tcp-protocol/{protocol}/{protocol}.go.tmpl @@ -6,6 +6,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" ) @@ -15,6 +16,7 @@ type {plugin_type} struct { ports protos.PortsConfig parserConfig parserConfig transConfig transactionConfig + watcher procs.ProcessesWatcher pub transPub } @@ -45,6 +47,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &{plugin_type}{} @@ -55,17 +58,18 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func ({plugin_var} *{plugin_type}) init(results protos.Reporter, config *{protocol}Config) error { +func ({plugin_var} *{plugin_type}) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *{protocol}Config) error { if err := {plugin_var}.setFromConfig(config); err != nil { return err } {plugin_var}.pub.results = results + {plugin_var}.watcher = watcher isDebug = logp.IsDebug("http") return nil @@ -162,7 +166,7 @@ func ({plugin_var} *{plugin_type}) ensureConnection(private protos.ProtocolData) conn := getConnection(private) if conn == nil { conn = &connection{} - conn.trans.init(&{plugin_var}.transConfig, {plugin_var}.pub.onTransaction) + conn.trans.init(&{plugin_var}.transConfig, {plugin_var}.watcher, {plugin_var}.pub.onTransaction) } return conn } From cce8f7b974ec5365c65f4e6cd753eaba94a8ac36 Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Mon, 26 Oct 2020 09:58:48 -0400 Subject: [PATCH 2/8] Add documentation changes and a Changelog entry --- CHANGELOG.next.asciidoc | 2 +- packetbeat/docs/packetbeat-options.asciidoc | 11 +++++ packetbeat/packetbeat.reference.yml | 45 +++++++++++++++++++++ 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1bf2cc8f762..0492ffd0464 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -778,7 +778,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add ECS fields for x509 certs, event categorization, and related IP info. {pull}19167[19167] - Add 100-continue support {issue}15830[15830] {pull}19349[19349] - Add initial SIP protocol support {pull}21221[21221] - +- Add support for overriding the published index on a per-protoocl/flow basis. {pull}22134[22134] *Functionbeat* - Add basic ECS categorization and `cloud` fields. {pull}19174[19174] diff --git a/packetbeat/docs/packetbeat-options.asciidoc b/packetbeat/docs/packetbeat-options.asciidoc index 32d9c473054..d3777c594c4 100644 --- a/packetbeat/docs/packetbeat-options.asciidoc +++ b/packetbeat/docs/packetbeat-options.asciidoc @@ -431,6 +431,11 @@ processors in your config. If this option is set to true, fields with `null` values will be published in the output document. By default, `keep_null` is set to `false`. +[float] +==== `index` + +Overrides the index that flow events are published to. + [[configuration-protocols]] == Configure which transaction protocols to monitor @@ -554,6 +559,12 @@ custom fields as top-level fields, set the `fields_under_root` option to true. If a duplicate field is declared in the general configuration, then its value will be overwritten by the value declared here. +[float] +[[packetbeat-configuration-index]] +==== `index` + +Overrides the index that events for the given protocol are published to. + [source,yaml] -------------------------------------------------------------------------------- packetbeat.protocols: diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 6b936240bbb..da6d176be5a 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -63,6 +63,9 @@ packetbeat.flows: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where flow events are indexed. + #index: my-custom-flow-index + # =========================== Transaction protocols ============================ packetbeat.protocols: @@ -73,6 +76,9 @@ packetbeat.protocols: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where this protocol's events are indexed. + #index: my-custom-icmp-index + - type: amqp # Enable AMQP monitoring. Default: true #enabled: true @@ -113,6 +119,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-amqp-index + - type: cassandra #Cassandra port for traffic monitoring. ports: [9042] @@ -143,6 +152,9 @@ packetbeat.protocols: # This option indicates which Operator/Operators will be ignored. #ignored_ops: ["SUPPORTED","OPTIONS"] + # Overrides where this protocol's events are indexed. + #index: my-custom-cassandra-index + - type: dhcpv4 # Configure the DHCP for IPv4 ports. ports: [67, 68] @@ -183,6 +195,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-dhcpv4-index + - type: http # Enable HTTP monitoring. Default: true #enabled: true @@ -257,6 +272,9 @@ packetbeat.protocols: # be trimmed to this size. Default is 10 MB. #max_message_size: 10485760 + # Overrides where this protocol's events are indexed. + #index: my-custom-http-index + - type: memcache # Enable memcache monitoring. Default: true #enabled: true @@ -309,6 +327,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-memcache-index + - type: mysql # Enable mysql monitoring. Default: true #enabled: true @@ -332,6 +353,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-mysql-index + - type: pgsql # Enable pgsql monitoring. Default: true #enabled: true @@ -355,6 +379,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-pgsql-index + - type: redis # Enable redis monitoring. Default: true #enabled: true @@ -387,6 +414,9 @@ packetbeat.protocols: # large enough to allow for pipelining. #queue_max_messages: 20000 + # Overrides where this protocol's events are indexed. + #index: my-custom-redis-index + - type: thrift # Enable thrift monitoring. Default: true #enabled: true @@ -445,6 +475,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-thrift-index + - type: mongodb # Enable mongodb monitoring. Default: true #enabled: true @@ -478,6 +511,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-mongodb-index + - type: nfs # Enable NFS monitoring. Default: true #enabled: true @@ -501,6 +537,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-nfs-index + - type: tls # Enable TLS monitoring. Default: true #enabled: true @@ -531,6 +570,9 @@ packetbeat.protocols: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where this protocol's events are indexed. + #index: my-custom-tls-index + - type: sip # Configure the ports where to listen for SIP traffic. You can disable the SIP protocol by commenting out the list of ports. ports: [5060] @@ -544,6 +586,9 @@ packetbeat.protocols: # Preserve original contents in event.original keep_original: true + # Overrides where this protocol's events are indexed. + #index: my-custom-sip-index + # ============================ Monitored processes ============================= # Packetbeat can enrich events with information about the process associated From a242bdb9629b870189301a56d8b8cfe96444c729 Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Mon, 26 Oct 2020 10:49:07 -0400 Subject: [PATCH 3/8] Update reference template --- .../_meta/config/beat.reference.yml.tmpl | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/packetbeat/_meta/config/beat.reference.yml.tmpl b/packetbeat/_meta/config/beat.reference.yml.tmpl index 722c47102dc..5ccc9bf5a92 100644 --- a/packetbeat/_meta/config/beat.reference.yml.tmpl +++ b/packetbeat/_meta/config/beat.reference.yml.tmpl @@ -63,6 +63,9 @@ packetbeat.flows: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where flow events are indexed. + #index: my-custom-flow-index + {{header "Transaction protocols"}} packetbeat.protocols: @@ -73,6 +76,9 @@ packetbeat.protocols: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where this protocol's events are indexed. + #index: my-custom-icmp-index + - type: amqp # Enable AMQP monitoring. Default: true #enabled: true @@ -113,6 +119,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-amqp-index + - type: cassandra #Cassandra port for traffic monitoring. ports: [9042] @@ -143,6 +152,9 @@ packetbeat.protocols: # This option indicates which Operator/Operators will be ignored. #ignored_ops: ["SUPPORTED","OPTIONS"] + # Overrides where this protocol's events are indexed. + #index: my-custom-cassandra-index + - type: dhcpv4 # Configure the DHCP for IPv4 ports. ports: [67, 68] @@ -183,6 +195,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-dhcpv4-index + - type: http # Enable HTTP monitoring. Default: true #enabled: true @@ -257,6 +272,9 @@ packetbeat.protocols: # be trimmed to this size. Default is 10 MB. #max_message_size: 10485760 + # Overrides where this protocol's events are indexed. + #index: my-custom-http-index + - type: memcache # Enable memcache monitoring. Default: true #enabled: true @@ -309,6 +327,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-memcache-index + - type: mysql # Enable mysql monitoring. Default: true #enabled: true @@ -332,6 +353,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-mysql-index + - type: pgsql # Enable pgsql monitoring. Default: true #enabled: true @@ -355,6 +379,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-pgsql-index + - type: redis # Enable redis monitoring. Default: true #enabled: true @@ -387,6 +414,9 @@ packetbeat.protocols: # large enough to allow for pipelining. #queue_max_messages: 20000 + # Overrides where this protocol's events are indexed. + #index: my-custom-redis-index + - type: thrift # Enable thrift monitoring. Default: true #enabled: true @@ -445,6 +475,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-thrift-index + - type: mongodb # Enable mongodb monitoring. Default: true #enabled: true @@ -478,6 +511,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-mongodb-index + - type: nfs # Enable NFS monitoring. Default: true #enabled: true @@ -501,6 +537,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-nfs-index + - type: tls # Enable TLS monitoring. Default: true #enabled: true @@ -531,6 +570,9 @@ packetbeat.protocols: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where this protocol's events are indexed. + #index: my-custom-tls-index + - type: sip # Configure the ports where to listen for SIP traffic. You can disable the SIP protocol by commenting out the list of ports. ports: [5060] @@ -544,6 +586,9 @@ packetbeat.protocols: # Preserve original contents in event.original keep_original: true + # Overrides where this protocol's events are indexed. + #index: my-custom-sip-index + {{header "Monitored processes"}} # Packetbeat can enrich events with information about the process associated From 43e719c39fcac4c410b1584ce7975116a91c3ef9 Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Tue, 27 Oct 2020 11:10:31 -0400 Subject: [PATCH 4/8] Fix funny merge --- x-pack/packetbeat/packetbeat.reference.yml | 45 ++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/x-pack/packetbeat/packetbeat.reference.yml b/x-pack/packetbeat/packetbeat.reference.yml index 6b936240bbb..da6d176be5a 100644 --- a/x-pack/packetbeat/packetbeat.reference.yml +++ b/x-pack/packetbeat/packetbeat.reference.yml @@ -63,6 +63,9 @@ packetbeat.flows: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where flow events are indexed. + #index: my-custom-flow-index + # =========================== Transaction protocols ============================ packetbeat.protocols: @@ -73,6 +76,9 @@ packetbeat.protocols: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where this protocol's events are indexed. + #index: my-custom-icmp-index + - type: amqp # Enable AMQP monitoring. Default: true #enabled: true @@ -113,6 +119,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-amqp-index + - type: cassandra #Cassandra port for traffic monitoring. ports: [9042] @@ -143,6 +152,9 @@ packetbeat.protocols: # This option indicates which Operator/Operators will be ignored. #ignored_ops: ["SUPPORTED","OPTIONS"] + # Overrides where this protocol's events are indexed. + #index: my-custom-cassandra-index + - type: dhcpv4 # Configure the DHCP for IPv4 ports. ports: [67, 68] @@ -183,6 +195,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-dhcpv4-index + - type: http # Enable HTTP monitoring. Default: true #enabled: true @@ -257,6 +272,9 @@ packetbeat.protocols: # be trimmed to this size. Default is 10 MB. #max_message_size: 10485760 + # Overrides where this protocol's events are indexed. + #index: my-custom-http-index + - type: memcache # Enable memcache monitoring. Default: true #enabled: true @@ -309,6 +327,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-memcache-index + - type: mysql # Enable mysql monitoring. Default: true #enabled: true @@ -332,6 +353,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-mysql-index + - type: pgsql # Enable pgsql monitoring. Default: true #enabled: true @@ -355,6 +379,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-pgsql-index + - type: redis # Enable redis monitoring. Default: true #enabled: true @@ -387,6 +414,9 @@ packetbeat.protocols: # large enough to allow for pipelining. #queue_max_messages: 20000 + # Overrides where this protocol's events are indexed. + #index: my-custom-redis-index + - type: thrift # Enable thrift monitoring. Default: true #enabled: true @@ -445,6 +475,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-thrift-index + - type: mongodb # Enable mongodb monitoring. Default: true #enabled: true @@ -478,6 +511,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-mongodb-index + - type: nfs # Enable NFS monitoring. Default: true #enabled: true @@ -501,6 +537,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-nfs-index + - type: tls # Enable TLS monitoring. Default: true #enabled: true @@ -531,6 +570,9 @@ packetbeat.protocols: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where this protocol's events are indexed. + #index: my-custom-tls-index + - type: sip # Configure the ports where to listen for SIP traffic. You can disable the SIP protocol by commenting out the list of ports. ports: [5060] @@ -544,6 +586,9 @@ packetbeat.protocols: # Preserve original contents in event.original keep_original: true + # Overrides where this protocol's events are indexed. + #index: my-custom-sip-index + # ============================ Monitored processes ============================= # Packetbeat can enrich events with information about the process associated From 3d10bdcdd6b21f0c29fddc850a45e9e113c24991 Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Wed, 28 Oct 2020 09:59:21 -0400 Subject: [PATCH 5/8] Incorporate feedback --- CHANGELOG.next.asciidoc | 2 +- libbeat/cfgfile/list.go | 4 ++- packetbeat/beater/packetbeat.go | 25 ++++++++------ packetbeat/beater/processor.go | 32 ++++++++--------- packetbeat/beater/reloader.go | 61 ++++++++++++++++++++++----------- packetbeat/config/agent.go | 47 +++++++++++++------------ packetbeat/config/agent_test.go | 6 +--- packetbeat/config/config.go | 12 +++++-- 8 files changed, 110 insertions(+), 79 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1cb52a0e734..8ffd4ba69de 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -782,7 +782,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add ECS fields for x509 certs, event categorization, and related IP info. {pull}19167[19167] - Add 100-continue support {issue}15830[15830] {pull}19349[19349] - Add initial SIP protocol support {pull}21221[21221] -- Add support for overriding the published index on a per-protoocl/flow basis. {pull}22134[22134] +- Add support for overriding the published index on a per-protocol/flow basis. {pull}22134[22134] - Change build process for x-pack distribution {pull}21979[21979] diff --git a/libbeat/cfgfile/list.go b/libbeat/cfgfile/list.go index 9b62d95f6a9..38193ef5204 100644 --- a/libbeat/cfgfile/list.go +++ b/libbeat/cfgfile/list.go @@ -157,7 +157,9 @@ func (r *RunnerList) Has(hash uint64) bool { // HashConfig hashes a given common.Config func HashConfig(c *common.Config) (uint64, error) { var config map[string]interface{} - c.Unpack(&config) + if err := c.Unpack(&config); err != nil { + return 0, err + } return hashstructure.Hash(config, nil) } diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index 0526e2090d7..390fde04858 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -79,17 +79,17 @@ type packetbeat struct { } func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { - config := initialConfig() - err := rawConfig.Unpack(&config) + cfg := initialConfig() + err := rawConfig.Unpack(&cfg) if err != nil { - logp.Err("fails to read the beat config: %v, %v", err, config) + logp.Err("fails to read the beat config: %v, %v", err, cfg) return nil, err } watcher := procs.ProcessesWatcher{} // Enable the process watcher only if capturing live traffic - if config.Interfaces.File == "" { - err = watcher.Init(config.Procs) + if cfg.Interfaces.File == "" { + err = watcher.Init(cfg.Procs) if err != nil { logp.Critical(err.Error()) return nil, err @@ -101,21 +101,26 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { publisher, err := publish.NewTransactionPublisher( b.Info.Name, b.Publisher, - config.IgnoreOutgoing, - config.Interfaces.File == "", + cfg.IgnoreOutgoing, + cfg.Interfaces.File == "", ) if err != nil { return nil, err } - factory := newProcessorFactory(b.Info.Name, make(chan error, 1), publisher) + configurator := config.NewAgentConfig + if !b.Manager.Enabled() { + configurator = cfg.FromStatic + } + + factory := newProcessorFactory(b.Info.Name, make(chan error, 1), publisher, configurator) if err := factory.CheckConfig(rawConfig); err != nil { return nil, err } return &packetbeat{ config: rawConfig, - shutdownTimeout: config.ShutdownTimeout, + shutdownTimeout: cfg.ShutdownTimeout, factory: factory, publisher: publisher, done: make(chan struct{}), @@ -165,7 +170,7 @@ func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error { func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error { runner := newReloader(management.DebugK, factory, b.Publisher) - reload.Register.MustRegister(b.Info.Beat, runner) + reload.Register.MustRegisterList("inputs", runner) defer runner.Stop() logp.Debug("main", "Waiting for the runner to finish") diff --git a/packetbeat/beater/processor.go b/packetbeat/beater/processor.go index dc9dde1cb5f..fe0ada6e624 100644 --- a/packetbeat/beater/processor.go +++ b/packetbeat/beater/processor.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/publisher/pipeline" + "github.com/elastic/beats/v7/packetbeat/config" "github.com/elastic/beats/v7/packetbeat/flows" "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" @@ -63,7 +64,8 @@ func (p *processor) Start() { err := p.sniffer.Run() if err != nil { - p.err <- fmt.Errorf("Sniffer loop failed: %v", err) + p.err <- fmt.Errorf("sniffer loop failed: %v", err) + return } p.err <- nil }() @@ -78,31 +80,25 @@ func (p *processor) Stop() { } type processorFactory struct { - name string - err chan error - publisher *publish.TransactionPublisher + name string + err chan error + publisher *publish.TransactionPublisher + configurator func(*common.Config) (config.Config, error) } -func newProcessorFactory(name string, err chan error, publisher *publish.TransactionPublisher) *processorFactory { +func newProcessorFactory(name string, err chan error, publisher *publish.TransactionPublisher, configurator func(*common.Config) (config.Config, error)) *processorFactory { return &processorFactory{ - name: name, - err: err, - publisher: publisher, + name: name, + err: err, + publisher: publisher, + configurator: configurator, } } func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *common.Config) (cfgfile.Runner, error) { - config := initialConfig() - err := cfg.Unpack(&config) + config, err := p.configurator(cfg) if err != nil { - logp.Err("fails to read the beat config: %v, %v", err, config) - return nil, err - } - - // normalize agent-based configuration - config, err = config.Normalize() - if err != nil { - logp.Err("failed to normalize the beat config: %v, %v", err, config) + logp.Err("Failed to read the beat config: %v, %v", err, config) return nil, err } diff --git a/packetbeat/beater/reloader.go b/packetbeat/beater/reloader.go index 6cbec3c6d1d..583d25102c1 100644 --- a/packetbeat/beater/reloader.go +++ b/packetbeat/beater/reloader.go @@ -18,6 +18,7 @@ package beater import ( + "sort" "sync" "github.com/pkg/errors" @@ -31,12 +32,24 @@ import ( ) type reloader struct { - mutex sync.Mutex - factory cfgfile.RunnerFactory - runner cfgfile.Runner - configHash uint64 - pipeline beat.PipelineConnector - logger *logp.Logger + mutex sync.Mutex + factory cfgfile.RunnerFactory + runner cfgfile.Runner + configHashes []uint64 + pipeline beat.PipelineConnector + logger *logp.Logger +} + +func equalHashes(a, b []uint64) bool { + if len(a) != len(b) { + return false + } + for i, v := range a { + if v != b[i] { + return false + } + } + return true } func newReloader(name string, factory cfgfile.RunnerFactory, pipeline beat.PipelineConnector) *reloader { @@ -55,24 +68,37 @@ func (r *reloader) Stop() { } } -func (r *reloader) Reload(config *reload.ConfigWithMeta) error { +func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { r.mutex.Lock() defer r.mutex.Unlock() r.logger.Debug("Starting reload procedure") - hash, err := cfgfile.HashConfig(config.Config) + hashes := make([]uint64, len(configs)) + combined := make([]*common.Config, len(configs)) + for i, c := range configs { + combined[i] = c.Config + hash, err := cfgfile.HashConfig(c.Config) + if err != nil { + r.logger.Errorf("Unable to hash given config: %s", err) + return errors.Wrap(err, "unable to hash given config") + } + hashes[i] = hash + } + sort.Slice(hashes, func(i, j int) bool { return hashes[i] < hashes[j] }) + + config, err := common.NewConfigFrom(combined) if err != nil { - r.logger.Errorf("Unable to hash given config: %s", err) - return errors.Wrap(err, "Unable to hash given config") + r.logger.Errorf("Unable to combine configurations: %s", err) + return errors.Wrap(err, "unable to combine configurations") } - if hash == r.configHash { + if equalHashes(hashes, r.configHashes) { // we have the same config reloaded return nil } // reinitialize config hash - r.configHash = 0 + r.configHashes = nil if r.runner != nil { go r.runner.Stop() @@ -80,19 +106,14 @@ func (r *reloader) Reload(config *reload.ConfigWithMeta) error { // reinitialize runner r.runner = nil - c, err := common.NewConfigFrom(config.Config) - if err != nil { - r.logger.Errorf("Unable to create new configuration for factory: %s", err) - return errors.Wrap(err, "Unable to create new configuration for factory") - } - runner, err := r.factory.Create(pipetool.WithDynamicFields(r.pipeline, config.Meta), c) + runner, err := r.factory.Create(pipetool.WithDynamicFields(r.pipeline, nil), config) if err != nil { r.logger.Errorf("Unable to create new runner: %s", err) - return errors.Wrap(err, "Unable to create new runner") + return errors.Wrap(err, "unable to create new runner") } r.logger.Debugf("Starting runner: %s", runner) - r.configHash = hash + r.configHashes = hashes r.runner = runner runner.Start() diff --git a/packetbeat/config/agent.go b/packetbeat/config/agent.go index 3c5c6b70c6f..82df8a2baec 100644 --- a/packetbeat/config/agent.go +++ b/packetbeat/config/agent.go @@ -38,43 +38,48 @@ func defaultDevice() string { return "0" } -// Normalize allows the packetbeat configuration to understand +// NewAgentConfig allows the packetbeat configuration to understand // agent semantics -func (c Config) Normalize() (Config, error) { +func NewAgentConfig(cfg *common.Config) (Config, error) { logp.Debug("agent", "Normalizing agent configuration") - if len(c.Inputs) > 0 { - // override everything, we're managed by agent - c.Flows = nil - c.Protocols = nil - c.ProtocolsList = []*common.Config{} - // TODO: make this configurable rather than just using the default device in - // managed mode - c.Interfaces.Device = defaultDevice() + var configMap []map[string]interface{} + config := Config{ + Interfaces: InterfacesConfig{ + // TODO: make this configurable rather than just using the default device + Device: defaultDevice(), + }, + } + if err := cfg.Unpack(&configMap); err != nil { + return config, err } - for _, input := range c.Inputs { + logp.Debug("agent", fmt.Sprintf("Found %d inputs", len(configMap))) + for _, input := range configMap { if rawInputType, ok := input["type"]; ok { inputType, ok := rawInputType.(string) - if ok && strings.HasPrefix(inputType, "network/") { - config, err := common.NewConfigFrom(input) + if !ok { + return config, fmt.Errorf("invalid input type of: '%T'", rawInputType) + } + logp.Debug("agent", fmt.Sprintf("Found agent configuration for %v", inputType)) + if strings.HasPrefix(inputType, "network/") { + cfg, err := common.NewConfigFrom(input) if err != nil { - return c, err + return config, err } protocol := strings.TrimPrefix(inputType, "network/") - logp.Debug("agent", fmt.Sprintf("Found agent configuration for %v", protocol)) switch protocol { case "flows": - if err := config.Unpack(&c.Flows); err != nil { - return c, err + if err := cfg.Unpack(&config.Flows); err != nil { + return config, err } default: - if err = config.SetString("type", -1, protocol); err != nil { - return c, err + if err = cfg.SetString("type", -1, protocol); err != nil { + return config, err } - c.ProtocolsList = append(c.ProtocolsList, config) + config.ProtocolsList = append(config.ProtocolsList, cfg) } } } } - return c, nil + return config, nil } diff --git a/packetbeat/config/agent_test.go b/packetbeat/config/agent_test.go index eb82bb90f21..0142a41ef30 100644 --- a/packetbeat/config/agent_test.go +++ b/packetbeat/config/agent_test.go @@ -27,7 +27,6 @@ import ( func TestAgentInputNormalization(t *testing.T) { cfg, err := common.NewConfigFrom(` -inputs: - type: network/flows timeout: 10s period: 10s @@ -38,10 +37,7 @@ inputs: data_stream.namespace: default `) require.NoError(t, err) - config := Config{} - require.NoError(t, cfg.Unpack(&config)) - - config, err = config.Normalize() + config, err := NewAgentConfig(cfg) require.NoError(t, err) require.Equal(t, config.Flows.Timeout, "10s") diff --git a/packetbeat/config/config.go b/packetbeat/config/config.go index 1d344787055..9d3818d4aa4 100644 --- a/packetbeat/config/config.go +++ b/packetbeat/config/config.go @@ -34,9 +34,15 @@ type Config struct { Procs procs.ProcsConfig `config:"procs"` IgnoreOutgoing bool `config:"ignore_outgoing"` ShutdownTimeout time.Duration `config:"shutdown_timeout"` +} - // agent configuration - Inputs []map[string]interface{} `config:"inputs"` +// FromStatic initializes a configuration given a common.Config +func (c Config) FromStatic(cfg *common.Config) (Config, error) { + err := cfg.Unpack(&c) + if err != nil { + return c, err + } + return c, nil } // ICMP returns the ICMP configuration @@ -60,7 +66,7 @@ func (c Config) ICMP() (*common.Config, error) { } if icmp != nil { - return nil, errors.New("More then one icmp configurations found") + return nil, errors.New("more than one icmp configuration found") } icmp = cfg From 9539f4ad8a3e79350435fb90e839d2c9fce94847 Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Wed, 28 Oct 2020 20:07:27 -0400 Subject: [PATCH 6/8] use streams instead of inputs --- packetbeat/beater/reloader.go | 88 ++------------------------- packetbeat/config/agent.go | 102 ++++++++++++++++++++++++-------- packetbeat/config/agent_test.go | 35 ++++++++--- 3 files changed, 110 insertions(+), 115 deletions(-) diff --git a/packetbeat/beater/reloader.go b/packetbeat/beater/reloader.go index 583d25102c1..da6f8f01aaf 100644 --- a/packetbeat/beater/reloader.go +++ b/packetbeat/beater/reloader.go @@ -18,104 +18,26 @@ package beater import ( - "sort" - "sync" - "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" - "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/reload" - "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/libbeat/publisher/pipetool" ) type reloader struct { - mutex sync.Mutex - factory cfgfile.RunnerFactory - runner cfgfile.Runner - configHashes []uint64 - pipeline beat.PipelineConnector - logger *logp.Logger -} - -func equalHashes(a, b []uint64) bool { - if len(a) != len(b) { - return false - } - for i, v := range a { - if v != b[i] { - return false - } - } - return true + *cfgfile.RunnerList } func newReloader(name string, factory cfgfile.RunnerFactory, pipeline beat.PipelineConnector) *reloader { return &reloader{ - factory: factory, - logger: logp.NewLogger(name), - } -} - -func (r *reloader) Stop() { - r.mutex.Lock() - defer r.mutex.Unlock() - - if r.runner != nil { - r.runner.Stop() + RunnerList: cfgfile.NewRunnerList(name, factory, pipeline), } } func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { - r.mutex.Lock() - defer r.mutex.Unlock() - - r.logger.Debug("Starting reload procedure") - - hashes := make([]uint64, len(configs)) - combined := make([]*common.Config, len(configs)) - for i, c := range configs { - combined[i] = c.Config - hash, err := cfgfile.HashConfig(c.Config) - if err != nil { - r.logger.Errorf("Unable to hash given config: %s", err) - return errors.Wrap(err, "unable to hash given config") - } - hashes[i] = hash + if len(configs) > 1 { + return errors.New("only a single input is currently supported") } - sort.Slice(hashes, func(i, j int) bool { return hashes[i] < hashes[j] }) - - config, err := common.NewConfigFrom(combined) - if err != nil { - r.logger.Errorf("Unable to combine configurations: %s", err) - return errors.Wrap(err, "unable to combine configurations") - } - - if equalHashes(hashes, r.configHashes) { - // we have the same config reloaded - return nil - } - // reinitialize config hash - r.configHashes = nil - - if r.runner != nil { - go r.runner.Stop() - } - // reinitialize runner - r.runner = nil - - runner, err := r.factory.Create(pipetool.WithDynamicFields(r.pipeline, nil), config) - if err != nil { - r.logger.Errorf("Unable to create new runner: %s", err) - return errors.Wrap(err, "unable to create new runner") - } - - r.logger.Debugf("Starting runner: %s", runner) - r.configHashes = hashes - r.runner = runner - runner.Start() - - return nil + return r.RunnerList.Reload(configs) } diff --git a/packetbeat/config/agent.go b/packetbeat/config/agent.go index 82df8a2baec..30f64630d44 100644 --- a/packetbeat/config/agent.go +++ b/packetbeat/config/agent.go @@ -20,12 +20,25 @@ package config import ( "fmt" "runtime" - "strings" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/go-ucfg" ) +type datastream struct { + Namespace string `config:"namespace"` + Dataset string `config:"dataset"` + Type string `config:"type"` +} + +type agentInput struct { + Type string `config:"type"` + Datastream datastream `config:"data_stream"` + Processors []common.MapStr `config:"processors"` + Streams []map[string]interface{} `config:"streams"` +} + var osDefaultDevices = map[string]string{ "darwin": "en0", "linux": "any", @@ -38,46 +51,87 @@ func defaultDevice() string { return "0" } +func (i agentInput) addProcessorsAndIndex(cfg *common.Config) (*common.Config, error) { + namespace := i.Datastream.Namespace + if namespace == "" { + namespace = "default" + } + datastreamConfig := struct { + Datastream datastream `config:"data_stream"` + }{} + if err := cfg.Unpack(&datastreamConfig); err != nil { + return nil, err + } + mergeConfig, err := common.NewConfigFrom(common.MapStr{ + "index": datastreamConfig.Datastream.Type + "-" + datastreamConfig.Datastream.Dataset + "-" + namespace, + "processors": append([]common.MapStr{ + common.MapStr{ + "add_fields": common.MapStr{ + "target": "data_stream", + "fields": common.MapStr{ + "type": datastreamConfig.Datastream.Type, + "dataset": datastreamConfig.Datastream.Dataset, + "namespace": namespace, + }, + }, + }, + common.MapStr{ + "add_fields": common.MapStr{ + "target": "event", + "fields": common.MapStr{ + "dataset": datastreamConfig.Datastream.Dataset, + }, + }, + }, + }, i.Processors...), + }) + if err != nil { + return nil, err + } + if err := cfg.MergeWithOpts(mergeConfig, ucfg.FieldAppendValues("processors")); err != nil { + return nil, err + } + return cfg, nil +} + // NewAgentConfig allows the packetbeat configuration to understand // agent semantics func NewAgentConfig(cfg *common.Config) (Config, error) { logp.Debug("agent", "Normalizing agent configuration") - var configMap []map[string]interface{} + var input agentInput config := Config{ Interfaces: InterfacesConfig{ // TODO: make this configurable rather than just using the default device Device: defaultDevice(), }, } - if err := cfg.Unpack(&configMap); err != nil { + if err := cfg.Unpack(&input); err != nil { return config, err } - logp.Debug("agent", fmt.Sprintf("Found %d inputs", len(configMap))) - for _, input := range configMap { - if rawInputType, ok := input["type"]; ok { - inputType, ok := rawInputType.(string) + logp.Debug("agent", fmt.Sprintf("Found %d inputs", len(input.Streams))) + for _, stream := range input.Streams { + if rawStreamType, ok := stream["type"]; ok { + streamType, ok := rawStreamType.(string) if !ok { - return config, fmt.Errorf("invalid input type of: '%T'", rawInputType) + return config, fmt.Errorf("invalid input type of: '%T'", rawStreamType) } - logp.Debug("agent", fmt.Sprintf("Found agent configuration for %v", inputType)) - if strings.HasPrefix(inputType, "network/") { - cfg, err := common.NewConfigFrom(input) - if err != nil { + logp.Debug("agent", fmt.Sprintf("Found agent configuration for %v", streamType)) + cfg, err := common.NewConfigFrom(stream) + if err != nil { + return config, err + } + cfg, err = input.addProcessorsAndIndex(cfg) + if err != nil { + return config, err + } + switch streamType { + case "flow": + if err := cfg.Unpack(&config.Flows); err != nil { return config, err } - protocol := strings.TrimPrefix(inputType, "network/") - switch protocol { - case "flows": - if err := cfg.Unpack(&config.Flows); err != nil { - return config, err - } - default: - if err = cfg.SetString("type", -1, protocol); err != nil { - return config, err - } - config.ProtocolsList = append(config.ProtocolsList, cfg) - } + default: + config.ProtocolsList = append(config.ProtocolsList, cfg) } } } diff --git a/packetbeat/config/agent_test.go b/packetbeat/config/agent_test.go index 0142a41ef30..2612423e0cb 100644 --- a/packetbeat/config/agent_test.go +++ b/packetbeat/config/agent_test.go @@ -27,19 +27,38 @@ import ( func TestAgentInputNormalization(t *testing.T) { cfg, err := common.NewConfigFrom(` -- type: network/flows - timeout: 10s - period: 10s - keep_null: false - data_stream.namespace: default -- type: network/amqp - ports: [5672] - data_stream.namespace: default +type: packet +data_stream: + namespace: default +processors: + - add_fields: + target: 'elastic_agent' + fields: + id: agent-id + version: 8.0.0 + snapshot: false +streams: + - type: flow + timeout: 10s + period: 10s + keep_null: false + data_stream: + dataset: packet.flow + type: logs + - type: icmp + data_stream: + dataset: packet.icmp + type: logs `) require.NoError(t, err) config, err := NewAgentConfig(cfg) require.NoError(t, err) require.Equal(t, config.Flows.Timeout, "10s") + require.Equal(t, config.Flows.Index, "logs-packet.flow-default") require.Len(t, config.ProtocolsList, 1) + + var protocol map[string]interface{} + require.NoError(t, config.ProtocolsList[0].Unpack(&protocol)) + require.Len(t, protocol["processors"].([]interface{}), 3) } From 9932ed64d6ae0c99369e4bfef2262f1901942a60 Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Thu, 29 Oct 2020 09:29:02 -0400 Subject: [PATCH 7/8] support multiple sniffers --- packetbeat/beater/packetbeat.go | 42 ++++++++-------------------- packetbeat/beater/processor.go | 49 ++++++++++++++++++++++----------- packetbeat/beater/reloader.go | 8 +++--- 3 files changed, 49 insertions(+), 50 deletions(-) diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index 390fde04858..e511c95eb10 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -29,14 +29,22 @@ import ( "github.com/elastic/beats/v7/libbeat/service" "github.com/elastic/beats/v7/packetbeat/config" - "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" - "github.com/elastic/beats/v7/packetbeat/publish" // Add packetbeat default processors _ "github.com/elastic/beats/v7/packetbeat/processor/add_kubernetes_metadata" ) +// this is mainly a limitation to ensure that we never deadlock +// after exiting the main select loop in centrally managed packetbeat +// in order to ensure we don't block on a channel write we make sure +// that the errors channel propagated back from the sniffers has a buffer +// that's equal to the number of sniffers that we can run, that way, if +// exiting and we throw a whole bunch of errors for some reason, each +// sniffer can write out the error even though the main loop has already +// exited with the result of the first error +var maxSniffers = 100 + type flags struct { file *string loop *int @@ -73,7 +81,6 @@ func initialConfig() config.Config { type packetbeat struct { config *common.Config factory *processorFactory - publisher *publish.TransactionPublisher shutdownTimeout time.Duration done chan struct{} } @@ -86,34 +93,12 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { return nil, err } - watcher := procs.ProcessesWatcher{} - // Enable the process watcher only if capturing live traffic - if cfg.Interfaces.File == "" { - err = watcher.Init(cfg.Procs) - if err != nil { - logp.Critical(err.Error()) - return nil, err - } - } else { - logp.Info("Process watcher disabled when file input is used") - } - - publisher, err := publish.NewTransactionPublisher( - b.Info.Name, - b.Publisher, - cfg.IgnoreOutgoing, - cfg.Interfaces.File == "", - ) - if err != nil { - return nil, err - } - configurator := config.NewAgentConfig if !b.Manager.Enabled() { configurator = cfg.FromStatic } - factory := newProcessorFactory(b.Info.Name, make(chan error, 1), publisher, configurator) + factory := newProcessorFactory(b.Info.Name, make(chan error, maxSniffers), b, configurator) if err := factory.CheckConfig(rawConfig); err != nil { return nil, err } @@ -122,7 +107,6 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { config: rawConfig, shutdownTimeout: cfg.ShutdownTimeout, factory: factory, - publisher: publisher, done: make(chan struct{}), }, nil } @@ -136,8 +120,6 @@ func (pb *packetbeat) Run(b *beat.Beat) error { } }() - defer pb.publisher.Stop() - timeout := pb.shutdownTimeout if timeout > 0 { defer time.Sleep(timeout) @@ -181,7 +163,7 @@ func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error return nil case err := <-factory.err: // when we're managed we don't want - // to stop if the sniffer exited without an error + // to stop if the sniffer(s) exited without an error // this would happen during a configuration reload if err != nil { close(pb.done) diff --git a/packetbeat/beater/processor.go b/packetbeat/beater/processor.go index fe0ada6e624..4aa3c35e2e3 100644 --- a/packetbeat/beater/processor.go +++ b/packetbeat/beater/processor.go @@ -36,17 +36,19 @@ import ( ) type processor struct { - wg sync.WaitGroup - flows *flows.Flows - sniffer *sniffer.Sniffer - err chan error + wg sync.WaitGroup + publisher *publish.TransactionPublisher + flows *flows.Flows + sniffer *sniffer.Sniffer + err chan error } -func newProcessor(flows *flows.Flows, sniffer *sniffer.Sniffer, err chan error) *processor { +func newProcessor(publisher *publish.TransactionPublisher, flows *flows.Flows, sniffer *sniffer.Sniffer, err chan error) *processor { return &processor{ - flows: flows, - sniffer: sniffer, - err: err, + publisher: publisher, + flows: flows, + sniffer: sniffer, + err: err, } } @@ -77,20 +79,21 @@ func (p *processor) Stop() { p.flows.Stop() } p.wg.Wait() + p.publisher.Stop() } type processorFactory struct { name string err chan error - publisher *publish.TransactionPublisher + beat *beat.Beat configurator func(*common.Config) (config.Config, error) } -func newProcessorFactory(name string, err chan error, publisher *publish.TransactionPublisher, configurator func(*common.Config) (config.Config, error)) *processorFactory { +func newProcessorFactory(name string, err chan error, beat *beat.Beat, configurator func(*common.Config) (config.Config, error)) *processorFactory { return &processorFactory{ name: name, err: err, - publisher: publisher, + beat: beat, configurator: configurator, } } @@ -102,6 +105,16 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *common.C return nil, err } + publisher, err := publish.NewTransactionPublisher( + p.beat.Info.Name, + p.beat.Publisher, + config.IgnoreOutgoing, + config.Interfaces.File == "", + ) + if err != nil { + return nil, err + } + watcher := procs.ProcessesWatcher{} // Enable the process watcher only if capturing live traffic if config.Interfaces.File == "" { @@ -116,7 +129,7 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *common.C logp.Debug("main", "Initializing protocol plugins") protocols := protos.NewProtocols() - err = protocols.Init(false, p.publisher, watcher, config.Protocols, config.ProtocolsList) + err = protocols.Init(false, publisher, watcher, config.Protocols, config.ProtocolsList) if err != nil { return nil, fmt.Errorf("Initializing protocol analyzers failed: %v", err) } @@ -124,15 +137,19 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *common.C if err != nil { return nil, err } - sniffer, err := setupSniffer(config, protocols, workerFactory(p.publisher, protocols, watcher, flows, config)) + sniffer, err := setupSniffer(config, protocols, workerFactory(publisher, protocols, watcher, flows, config)) if err != nil { return nil, err } - return newProcessor(flows, sniffer, p.err), nil + return newProcessor(publisher, flows, sniffer, p.err), nil } func (p *processorFactory) CheckConfig(config *common.Config) error { - _, err := p.Create(pipeline.NewNilPipeline(), config) - return err + runner, err := p.Create(pipeline.NewNilPipeline(), config) + if err != nil { + return err + } + runner.Stop() + return nil } diff --git a/packetbeat/beater/reloader.go b/packetbeat/beater/reloader.go index da6f8f01aaf..c6925e2fa95 100644 --- a/packetbeat/beater/reloader.go +++ b/packetbeat/beater/reloader.go @@ -18,7 +18,7 @@ package beater import ( - "github.com/pkg/errors" + "fmt" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" @@ -29,15 +29,15 @@ type reloader struct { *cfgfile.RunnerList } -func newReloader(name string, factory cfgfile.RunnerFactory, pipeline beat.PipelineConnector) *reloader { +func newReloader(name string, factory *processorFactory, pipeline beat.PipelineConnector) *reloader { return &reloader{ RunnerList: cfgfile.NewRunnerList(name, factory, pipeline), } } func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { - if len(configs) > 1 { - return errors.New("only a single input is currently supported") + if len(configs) > maxSniffers { + return fmt.Errorf("only %d inputs are currently supported", maxSniffers) } return r.RunnerList.Reload(configs) } From 95c721d8f83644318d305daacbcef41383f45776 Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Thu, 29 Oct 2020 11:28:17 -0400 Subject: [PATCH 8/8] fix shutdown_timeout behavior --- packetbeat/beater/packetbeat.go | 28 +++++++--------------------- packetbeat/beater/processor.go | 30 +++++++++++++++++++----------- 2 files changed, 26 insertions(+), 32 deletions(-) diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index e511c95eb10..d72a98d4a5f 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -79,23 +79,15 @@ func initialConfig() config.Config { // Beater object. Contains all objects needed to run the beat type packetbeat struct { - config *common.Config - factory *processorFactory - shutdownTimeout time.Duration - done chan struct{} + config *common.Config + factory *processorFactory + done chan struct{} } func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { - cfg := initialConfig() - err := rawConfig.Unpack(&cfg) - if err != nil { - logp.Err("fails to read the beat config: %v, %v", err, cfg) - return nil, err - } - configurator := config.NewAgentConfig if !b.Manager.Enabled() { - configurator = cfg.FromStatic + configurator = initialConfig().FromStatic } factory := newProcessorFactory(b.Info.Name, make(chan error, maxSniffers), b, configurator) @@ -104,10 +96,9 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { } return &packetbeat{ - config: rawConfig, - shutdownTimeout: cfg.ShutdownTimeout, - factory: factory, - done: make(chan struct{}), + config: rawConfig, + factory: factory, + done: make(chan struct{}), }, nil } @@ -120,11 +111,6 @@ func (pb *packetbeat) Run(b *beat.Beat) error { } }() - timeout := pb.shutdownTimeout - if timeout > 0 { - defer time.Sleep(timeout) - } - if !b.Manager.Enabled() { return pb.runStatic(b, pb.factory) } diff --git a/packetbeat/beater/processor.go b/packetbeat/beater/processor.go index 4aa3c35e2e3..d6aafbb1a7f 100644 --- a/packetbeat/beater/processor.go +++ b/packetbeat/beater/processor.go @@ -20,6 +20,7 @@ package beater import ( "fmt" "sync" + "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" @@ -36,19 +37,21 @@ import ( ) type processor struct { - wg sync.WaitGroup - publisher *publish.TransactionPublisher - flows *flows.Flows - sniffer *sniffer.Sniffer - err chan error + wg sync.WaitGroup + publisher *publish.TransactionPublisher + flows *flows.Flows + sniffer *sniffer.Sniffer + shutdownTimeout time.Duration + err chan error } -func newProcessor(publisher *publish.TransactionPublisher, flows *flows.Flows, sniffer *sniffer.Sniffer, err chan error) *processor { +func newProcessor(shutdownTimeout time.Duration, publisher *publish.TransactionPublisher, flows *flows.Flows, sniffer *sniffer.Sniffer, err chan error) *processor { return &processor{ - publisher: publisher, - flows: flows, - sniffer: sniffer, - err: err, + publisher: publisher, + flows: flows, + sniffer: sniffer, + err: err, + shutdownTimeout: shutdownTimeout, } } @@ -79,6 +82,11 @@ func (p *processor) Stop() { p.flows.Stop() } p.wg.Wait() + // wait for shutdownTimeout to let the publisher flush + // whatever pending events + if p.shutdownTimeout > 0 { + time.Sleep(p.shutdownTimeout) + } p.publisher.Stop() } @@ -142,7 +150,7 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *common.C return nil, err } - return newProcessor(publisher, flows, sniffer, p.err), nil + return newProcessor(config.ShutdownTimeout, publisher, flows, sniffer, p.err), nil } func (p *processorFactory) CheckConfig(config *common.Config) error {