Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update system/socket dataset to support config reloading #21693

Merged
merged 3 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- system/socket: Fixed a crash due to concurrent map read and write. {issue}21192[21192] {pull}21690[21690]
- file_integrity: stop monitoring excluded paths {issue}21278[21278] {pull}21282[21282]
- auditd: Fix an error condition causing a lot of `audit_send_reply` kernel threads being created. {pull}22673[22673]
- system/socket: Fixed start failure when run under config reloader. {issue}20851[20851] {pull}21693[21693]

*Filebeat*

Expand Down
23 changes: 21 additions & 2 deletions x-pack/auditbeat/module/system/socket/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package socket

import "time"
import (
"reflect"
"time"
)

// Config defines this metricset's configuration options.
type Config struct {
Expand Down Expand Up @@ -64,11 +67,27 @@ type Config struct {
EnableIPv6 *bool `config:"socket.enable_ipv6"`
}

// Validate validates the host metricset config.
// Validate validates the socket metricset config.
func (c *Config) Validate() error {
return nil
}

// Equals compares two Config objects
func (c *Config) Equals(other Config) bool {
// reflect.DeepEquals() doesn't compare pointed-to values, so strip
// all pointers and then compare them manually.
simpler := [2]Config{*c, other}
for idx := range simpler {
simpler[idx].EnableIPv6 = nil
simpler[idx].TraceFSPath = nil
}
return reflect.DeepEqual(simpler[0], simpler[1]) &&
(c.EnableIPv6 == nil) == (other.EnableIPv6 == nil) &&
(c.EnableIPv6 == nil || *c.EnableIPv6 == *other.EnableIPv6) &&
(c.TraceFSPath == nil) == (other.TraceFSPath == nil) &&
(c.TraceFSPath == nil || *c.TraceFSPath == *other.TraceFSPath)
}

var defaultConfig = Config{
PerfQueueSize: 4096,
LostQueueSize: 128,
Expand Down
2 changes: 1 addition & 1 deletion x-pack/auditbeat/module/system/socket/guess/creds.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (
)

func init() {
if err := Registry.AddGuess(&guessStructCreds{}); err != nil {
if err := Registry.AddGuess(func() Guesser { return &guessStructCreds{} }); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the changes to guesses are required because, turns out, every guess instance could only run once, as some of them keep internal state (counters, etc.). Replacing the registration with a factory method ensures that when config is reloaded a fresh set of guesses is run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update #21011 at some point to reflect this and probably get rid of the global registry for Guesses.

panic(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/auditbeat/module/system/socket/guess/cskxmit6.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
*/

func init() {
if err := Registry.AddGuess(&guessInet6CskXmit{}); err != nil {
if err := Registry.AddGuess(func() Guesser { return &guessInet6CskXmit{} }); err != nil {
panic(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/auditbeat/module/system/socket/guess/deref.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
*/

func init() {
if err := Registry.AddGuess(&guessDeref{}); err != nil {
if err := Registry.AddGuess(func() Guesser { return &guessDeref{} }); err != nil {
panic(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/auditbeat/module/system/socket/guess/inetsock.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
// matched the remote address. This is used by guess_inet_sock6.

func init() {
if err := Registry.AddGuess(&guessInetSockIPv4{}); err != nil {
if err := Registry.AddGuess(func() Guesser { return &guessInetSockIPv4{} }); err != nil {
panic(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/auditbeat/module/system/socket/guess/inetsock6.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ import (
const inetSockDumpSize = 8 * 256

func init() {
if err := Registry.AddGuess(&guessInetSockIPv6{}); err != nil {
if err := Registry.AddGuess(func() Guesser { return &guessInetSockIPv6{} }); err != nil {
panic(err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
const inetSockAfDumpSize = 8 * 16

func init() {
if err := Registry.AddGuess(&guessInetSockFamily{}); err != nil {
if err := Registry.AddGuess(func() Guesser { return &guessInetSockFamily{} }); err != nil {
panic(err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
)

func init() {
if err := Registry.AddGuess(&guessIPLocalOut{}); err != nil {
if err := Registry.AddGuess(func() Guesser { return &guessIPLocalOut{} }); err != nil {
panic(err)
}
}
Expand Down
24 changes: 14 additions & 10 deletions x-pack/auditbeat/module/system/socket/guess/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,33 @@ package guess

import "fmt"

// Registry serves as a registration point for guesses.
var Registry = Register{
guesses: make(map[string]Guesser),
}
// GuesserFactory is a factory function for guesses.
type GuesserFactory func() Guesser

// Register stores the registered guesses.
type Register struct {
guesses map[string]Guesser
factories map[string]GuesserFactory
}

// Registry serves as a registration point for guesses.
var Registry = Register{
factories: make(map[string]GuesserFactory),
}

// AddGuess registers a new guess.
func (r *Register) AddGuess(guess Guesser) error {
if _, found := r.guesses[guess.Name()]; found {
func (r *Register) AddGuess(factory GuesserFactory) error {
guess := factory()
if _, found := r.factories[guess.Name()]; found {
return fmt.Errorf("guess %s is duplicated", guess.Name())
}
r.guesses[guess.Name()] = guess
r.factories[guess.Name()] = factory
return nil
}

// GetList returns a list of registered guesses.
func (r *Register) GetList() (list []Guesser) {
for _, guess := range r.guesses {
list = append(list, guess)
for _, factory := range r.factories {
list = append(list, factory())
}
return list
}
6 changes: 3 additions & 3 deletions x-pack/auditbeat/module/system/socket/guess/skbuff.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ import (
const maxSafePayload = 508

func init() {
if err := Registry.AddGuess(&guessSkBuffLen{}); err != nil {
if err := Registry.AddGuess(func() Guesser { return &guessSkBuffLen{} }); err != nil {
panic(err)
}
if err := Registry.AddGuess(&guessSkBuffProto{}); err != nil {
if err := Registry.AddGuess(func() Guesser { return &guessSkBuffProto{} }); err != nil {
panic(err)
}
if err := Registry.AddGuess(&guessSkBuffDataPtr{}); err != nil {
if err := Registry.AddGuess(func() Guesser { return &guessSkBuffDataPtr{} }); err != nil {
panic(err)
}
}
Expand Down
3 changes: 1 addition & 2 deletions x-pack/auditbeat/module/system/socket/guess/sockaddrin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import (
*/

func init() {
if err := Registry.AddGuess(
&guessSockaddrIn{}); err != nil {
if err := Registry.AddGuess(func() Guesser { return &guessSockaddrIn{} }); err != nil {
panic(err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import (
*/

func init() {
if err := Registry.AddGuess(
&guessSockaddrIn6{}); err != nil {
if err := Registry.AddGuess(func() Guesser { return &guessSockaddrIn6{} }); err != nil {
panic(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/auditbeat/module/system/socket/guess/socketsk.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// "SOCKET_SOCK": 32

func init() {
if err := Registry.AddGuess(&guessSocketSock{}); err != nil {
if err := Registry.AddGuess(func() Guesser { return &guessSocketSock{} }); err != nil {
panic(err)
}
}
Expand Down
6 changes: 4 additions & 2 deletions x-pack/auditbeat/module/system/socket/guess/syscallargs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
*/

func init() {
if err := Registry.AddGuess(&guessSyscallArgs{
expected: [2]uintptr{^uintptr(0x11111111), ^uintptr(0x22222222)},
if err := Registry.AddGuess(func() Guesser {
return &guessSyscallArgs{
expected: [2]uintptr{^uintptr(0x11111111), ^uintptr(0x22222222)},
}
}); err != nil {
panic(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
// TCP_SENDMSG_LEN : +4(%sp)

func init() {
if err := Registry.AddGuess(&guessTCPSendMsg{}); err != nil {
if err := Registry.AddGuess(func() Guesser { return &guessTCPSendMsg{} }); err != nil {
panic(err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// TCP_SENDMSG_SOCK : %di

func init() {
if err := Registry.AddGuess(&guessTcpSendmsgSock{}); err != nil {
if err := Registry.AddGuess(func() Guesser { return &guessTcpSendmsgSock{} }); err != nil {
panic(err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// UDP_SENDMSG_MSG: $stack3

func init() {
if err := Registry.AddGuess(&guessUDPSendMsg{}); err != nil {
if err := Registry.AddGuess(func() Guesser { return &guessUDPSendMsg{} }); err != nil {
panic(err)
}
}
Expand Down
37 changes: 32 additions & 5 deletions x-pack/auditbeat/module/system/socket/socket_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
Expand Down Expand Up @@ -73,6 +74,7 @@ type MetricSet struct {
mountedFS *mountPoint
isDebug bool
isDetailed bool
terminated sync.WaitGroup
}

func init() {
Expand All @@ -86,20 +88,45 @@ func init() {
}
}

var (
// Singleton to instantiate one socket dataset at a time.
instance *MetricSet
instanceMutex sync.Mutex
)

// New constructs a new MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Beta("The %s dataset is beta.", fullName)
instanceMutex.Lock()
defer instanceMutex.Unlock()

config := defaultConfig
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, errors.Wrapf(err, "failed to unpack the %s config", fullName)
}
if instance != nil {
// Do not instantiate a new dataset if the config hasn't changed.
// This is necessary when run under config reloader even though the
// reloader itself already checks the config for changes, because
// the first time it runs it will allocate two consecutive instances
// (one for checking the config, one for running). This saves
// running the guesses twice on startup.
if config.Equals(instance.config) {
return instance, nil
}
instance.terminated.Wait()
}
var err error
instance, err = newSocketMetricset(config, base)
return instance, err
}

func newSocketMetricset(config Config, base mb.BaseMetricSet) (*MetricSet, error) {
cfgwarn.Beta("The %s dataset is beta.", fullName)
logger := logp.NewLogger(metricsetName)
sniffer, err := dns.NewSniffer(base, logger)
if err != nil {
return nil, errors.Wrap(err, "unable to create DNS sniffer")
}

ms := &MetricSet{
SystemMetricSet: system.NewSystemMetricSet(base),
templateVars: make(common.MapStr),
Expand All @@ -110,18 +137,19 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
isDetailed: logp.HasSelector(detailSelector),
sniffer: sniffer,
}

// Setup the metricset before Run() so that startup can be halted in case of
// error.
if err := ms.Setup(); err != nil {
if err = ms.Setup(); err != nil {
return nil, errors.Wrapf(err, "%s dataset setup failed", fullName)
}
return ms, nil
}

// Run the metricset. This will loop until the passed reporter is cancelled.
func (m *MetricSet) Run(r mb.PushReporterV2) {
m.terminated.Add(1)
defer m.log.Infof("%s terminated.", fullName)
defer m.terminated.Done()
defer m.Cleanup()

st := NewState(r,
Expand Down Expand Up @@ -235,7 +263,6 @@ func (m *MetricSet) Setup() (err error) {
//
var traceFS *tracing.TraceFS
if m.config.TraceFSPath == nil {

if err := tracing.IsTraceFSAvailable(); err != nil {
m.log.Debugf("tracefs/debugfs not found. Attempting to mount")
for _, mount := range defaultMounts {
Expand Down