From 12c386c0b8d281b7357603a0114a973d09881801 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Tue, 21 Sep 2021 10:42:17 -0700 Subject: [PATCH] Osquerybeat: Configuration change to allow support of osquery configuration features 1:1 (#27900) * Osquerybeat: Configuration change to allow support of osquery configuration features 1:1 * Cleaner osqueryd runner * Improve restart * Route ad-hoc queries results to the same namespaced index as scheduled queries * Unit test coverage for osquery runner --- x-pack/osquerybeat/beater/action_handler.go | 16 +- x-pack/osquerybeat/beater/config_plugin.go | 201 ++++++++++++------ .../osquerybeat/beater/config_plugin_test.go | 14 +- x-pack/osquerybeat/beater/osquery_runner.go | 129 +++++++++++ .../osquerybeat/beater/osquery_runner_test.go | 95 +++++++++ x-pack/osquerybeat/beater/osquerybeat.go | 55 ++--- x-pack/osquerybeat/internal/config/config.go | 24 ++- x-pack/osquerybeat/internal/config/osquery.go | 72 +++++++ x-pack/osquerybeat/internal/osqd/args.go | 101 +++++++++ x-pack/osquerybeat/internal/osqd/args_test.go | 52 +++++ x-pack/osquerybeat/internal/osqd/osqueryd.go | 101 ++++----- 11 files changed, 689 insertions(+), 171 deletions(-) create mode 100644 x-pack/osquerybeat/beater/osquery_runner.go create mode 100644 x-pack/osquerybeat/beater/osquery_runner_test.go create mode 100644 x-pack/osquerybeat/internal/config/osquery.go create mode 100644 x-pack/osquerybeat/internal/osqd/args.go create mode 100644 x-pack/osquerybeat/internal/osqd/args_test.go diff --git a/x-pack/osquerybeat/beater/action_handler.go b/x-pack/osquerybeat/beater/action_handler.go index 9eb443df6ef..2acba80ca4b 100644 --- a/x-pack/osquerybeat/beater/action_handler.go +++ b/x-pack/osquerybeat/beater/action_handler.go @@ -29,11 +29,16 @@ type queryExecutor interface { Query(ctx context.Context, sql string) ([]map[string]interface{}, error) } +type namespaceProvider interface { + GetNamespace() string +} + type actionHandler struct { log *logp.Logger inputType string publisher publisher queryExec queryExecutor + np namespaceProvider } func (a *actionHandler) Name() string { @@ -63,7 +68,16 @@ func (a *actionHandler) execute(ctx context.Context, req map[string]interface{}) if err != nil { return fmt.Errorf("%v: %w", err, ErrQueryExecution) } - return a.executeQuery(ctx, config.Datastream(config.DefaultNamespace), ac, "", req) + + var namespace string + if a.np != nil { + namespace = a.np.GetNamespace() + } + if namespace == "" { + namespace = config.DefaultNamespace + } + + return a.executeQuery(ctx, config.Datastream(namespace), ac, "", req) } func (a *actionHandler) executeQuery(ctx context.Context, index string, ac action.Action, responseID string, req map[string]interface{}) error { diff --git a/x-pack/osquerybeat/beater/config_plugin.go b/x-pack/osquerybeat/beater/config_plugin.go index bad584e32d6..c8374d0fa33 100644 --- a/x-pack/osquerybeat/beater/config_plugin.go +++ b/x-pack/osquerybeat/beater/config_plugin.go @@ -6,7 +6,6 @@ package beater import ( "context" - "encoding/json" "errors" "fmt" "strings" @@ -18,9 +17,9 @@ import ( ) const ( - configName = "osq_config" - scheduleSplayPercent = 10 - maxECSMappingDepth = 25 // Max ECS dot delimited key path, that is sufficient for the current ECS mapping + configName = "osq_config" + defaultScheduleSplayPercent = 10 + maxECSMappingDepth = 25 // Max ECS dot delimited key path, that is sufficient for the current ECS mapping keyField = "field" keyValue = "value" @@ -32,8 +31,8 @@ var ( ) type QueryInfo struct { - QueryConfig query - ECSMapping ecs.Mapping + Query string + ECSMapping ecs.Mapping } type queryInfoMap map[string]QueryInfo @@ -61,11 +60,15 @@ type ConfigPlugin struct { // we could be sending data into the datastream with namespace that we don't have permissions meanwhile namespaces map[string]string - // Packs - packs map[string]pack + // Osquery configuration + osqueryConfig *config.OsqueryConfig // Raw config bytes cached configString string + + // One common namespace from the first input as of 7.16 + // This is used to ad-hoc queries results over GetNamespace API + namespace string } func NewConfigPlugin(log *logp.Logger) *ConfigPlugin { @@ -85,6 +88,9 @@ func (p *ConfigPlugin) Set(inputs []config.InputConfig) error { } func (p *ConfigPlugin) Count() int { + p.mx.RLock() + defer p.mx.RUnlock() + return p.queriesCount } @@ -102,6 +108,12 @@ func (p *ConfigPlugin) LookupNamespace(name string) (ns string, ok bool) { return ns, ok } +func (p *ConfigPlugin) GetNamespace() string { + p.mx.RLock() + defer p.mx.RUnlock() + return p.namespace +} + func (p *ConfigPlugin) GenerateConfig(ctx context.Context) (map[string]string, error) { p.log.Debug("configPlugin GenerateConfig is called") @@ -119,47 +131,30 @@ func (p *ConfigPlugin) GenerateConfig(ctx context.Context) (map[string]string, e p.newQueryInfoMap = nil } + p.log.Info("Osqueryd configuration:", c) + return map[string]string{ configName: c, }, nil } -type query struct { - Query string `json:"query"` - Interval int `json:"interval,omitempty"` - Platform string `json:"platform,omitempty"` - Version string `json:"version,omitempty"` - Snapshot bool `json:"snapshot,omitempty"` -} - -type pack struct { - Discovery []string `json:"discovery,omitempty"` - Platform string `json:"platform,omitempty"` - Version string `json:"version,omitempty"` - Queries map[string]query `json:"queries,omitempty"` -} - -type osqueryConfig struct { - Options map[string]interface{} `json:"options"` - Packs map[string]pack `json:"packs,omitempty"` -} - -func newOsqueryConfig(packs map[string]pack) osqueryConfig { - return osqueryConfig{ - Options: map[string]interface{}{ - "schedule_splay_percent": scheduleSplayPercent, - }, - Packs: packs, +func newOsqueryConfig(osqueryConfig *config.OsqueryConfig) *config.OsqueryConfig { + if osqueryConfig == nil { + osqueryConfig = &config.OsqueryConfig{} } -} - -func (c osqueryConfig) render() ([]byte, error) { - return json.MarshalIndent(c, "", " ") + if osqueryConfig.Options == nil { + osqueryConfig.Options = make(map[string]interface{}) + } + const scheduleSplayPercentKey = "schedule_splay_percent" + if _, ok := osqueryConfig.Options[scheduleSplayPercentKey]; !ok { + osqueryConfig.Options[scheduleSplayPercentKey] = defaultScheduleSplayPercent + } + return osqueryConfig } func (p *ConfigPlugin) render() (string, error) { if p.configString == "" { - raw, err := newOsqueryConfig(p.packs).render() + raw, err := newOsqueryConfig(p.osqueryConfig).Render() if err != nil { return "", err } @@ -169,57 +164,129 @@ func (p *ConfigPlugin) render() (string, error) { return p.configString, nil } -func (p *ConfigPlugin) set(inputs []config.InputConfig) error { - var err error +func (p *ConfigPlugin) set(inputs []config.InputConfig) (err error) { p.configString = "" + p.namespace = "" + queriesCount := 0 + osqueryConfig := &config.OsqueryConfig{} newQueryInfoMap := make(map[string]QueryInfo) namespaces := make(map[string]string) - p.packs = make(map[string]pack) + + // Set the members if no errors + defer func() { + if err != nil { + return + } + p.osqueryConfig = osqueryConfig + p.newQueryInfoMap = newQueryInfoMap + p.namespaces = namespaces + p.queriesCount = queriesCount + }() + + // Return if no inputs, all the members will be reset by deferred call above + if len(inputs) == 0 { + return nil + } + + // Read namespace from the first input as of 7.16 + p.namespace = inputs[0].Datastream.Namespace + if p.namespace == "" { + p.namespace = config.DefaultNamespace + } + + // Since 7.16 version only one integration/input is expected + // The inputs[0].Osquery can be nil if this is pre 7.16 integration configuration + if inputs[0].Osquery != nil { + osqueryConfig = inputs[0].Osquery + } + + // Common code to register query with lookup maps, enforce snapshot and increment queries count + registerQuery := func(name, ns string, qi config.Query) (config.Query, error) { + var ecsm ecs.Mapping + ecsm, err = flattenECSMapping(qi.ECSMapping) + if err != nil { + return qi, err + } + + newQueryInfoMap[name] = QueryInfo{ + Query: qi.Query, + ECSMapping: ecsm, + } + namespaces[name] = p.namespace + queriesCount++ + + qi.Snapshot = true + return qi, nil + } + + // Iterate osquery configuration's scheduled queries, add flattened ECS mappings to lookup map + for name, qi := range osqueryConfig.Schedule { + qi, err = registerQuery(name, p.namespace, qi) + if err != nil { + return err + } + osqueryConfig.Schedule[name] = qi + } + + // Iterate osquery configuration's packs queries, add flattened ECS mappings to lookup map + for packName, pack := range osqueryConfig.Packs { + for name, qi := range pack.Queries { + qi, err = registerQuery(getPackQueryName(packName, name), p.namespace, qi) + if err != nil { + return err + } + pack.Queries[name] = qi + } + } + + // Iterate inputs for Osquery configuration for backwards compatibility for _, input := range inputs { - pack := pack{ - Queries: make(map[string]query), + pack := config.Pack{ + Queries: make(map[string]config.Query), Platform: input.Platform, Version: input.Version, Discovery: input.Discovery, } for _, stream := range input.Streams { - id := "pack_" + input.Name + "_" + stream.ID - query := query{ - Query: stream.Query, - Interval: stream.Interval, - Platform: stream.Platform, - Version: stream.Version, - Snapshot: true, // enforce snapshot for all queries + qi := config.Query{ + Query: stream.Query, + Interval: stream.Interval, + Platform: stream.Platform, + Version: stream.Version, + ECSMapping: stream.ECSMapping, } - var ecsm ecs.Mapping - if len(stream.ECSMapping) > 0 { - ecsm, err = flattenECSMapping(stream.ECSMapping) - if err != nil { - return err - } + + qi, err = registerQuery(getPackQueryName(input.Name, stream.ID), p.namespace, qi) + if err != nil { + return err } - newQueryInfoMap[id] = QueryInfo{ - QueryConfig: query, - ECSMapping: ecsm, + pack.Queries[stream.ID] = qi + } + + if len(pack.Queries) != 0 { + if osqueryConfig.Packs == nil { + osqueryConfig.Packs = make(map[string]config.Pack) } - namespaces[id] = input.Datastream.Namespace - pack.Queries[stream.ID] = query - queriesCount++ + osqueryConfig.Packs[input.Name] = pack } - p.packs[input.Name] = pack } - p.newQueryInfoMap = newQueryInfoMap - p.namespaces = namespaces - p.queriesCount = queriesCount + return nil } +func getPackQueryName(packName, queryName string) string { + return "pack_" + packName + "_" + queryName +} + // Due to current configuration passing between the agent and beats the keys that contain dots (".") // are split into the nested tree-like structure. // This converts this dynamic map[string]interface{} tree into strongly typed flat map. func flattenECSMapping(m map[string]interface{}) (ecs.Mapping, error) { + if m == nil { + return nil, nil + } ecsm := make(ecs.Mapping) for k, v := range m { if strings.TrimSpace(k) == "" { diff --git a/x-pack/osquerybeat/beater/config_plugin_test.go b/x-pack/osquerybeat/beater/config_plugin_test.go index d38e6cc8fba..c5a76eaf05c 100644 --- a/x-pack/osquerybeat/beater/config_plugin_test.go +++ b/x-pack/osquerybeat/beater/config_plugin_test.go @@ -21,16 +21,16 @@ import ( ) func renderFullConfigJSON(inputs []config.InputConfig) (string, error) { - packs := make(map[string]pack) + packs := make(map[string]config.Pack) for _, input := range inputs { - pack := pack{ + pack := config.Pack{ Platform: input.Platform, Version: input.Version, Discovery: input.Discovery, - Queries: make(map[string]query), + Queries: make(map[string]config.Query), } for _, stream := range input.Streams { - query := query{ + query := config.Query{ Query: stream.Query, Interval: stream.Interval, Platform: stream.Platform, @@ -41,7 +41,9 @@ func renderFullConfigJSON(inputs []config.InputConfig) (string, error) { } packs[input.Name] = pack } - raw, err := newOsqueryConfig(packs).render() + raw, err := newOsqueryConfig(&config.OsqueryConfig{ + Packs: packs, + }).Render() if err != nil { return "", err } @@ -391,7 +393,7 @@ func TestSet(t *testing.T) { if !ok { t.Fatalf("failed to resolve name %v", name) } - diff = cmp.Diff(qi.QueryConfig.Query, stream.Query) + diff = cmp.Diff(qi.Query, stream.Query) if diff != "" { t.Error(diff) } diff --git a/x-pack/osquerybeat/beater/osquery_runner.go b/x-pack/osquerybeat/beater/osquery_runner.go new file mode 100644 index 00000000000..a1c890a559d --- /dev/null +++ b/x-pack/osquerybeat/beater/osquery_runner.go @@ -0,0 +1,129 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package beater + +import ( + "context" + "errors" + "sync" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/config" + "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/osqd" +) + +type osqueryRunner struct { + log *logp.Logger + inputCh chan []config.InputConfig +} + +func newOsqueryRunner(log *logp.Logger) *osqueryRunner { + r := &osqueryRunner{ + log: log, + inputCh: make(chan []config.InputConfig, 1), + } + return r +} + +type osqueryRunFunc func(ctx context.Context, flags osqd.Flags, inputCh <-chan []config.InputConfig) error + +// Run manages osqueryd lifecycle, processes inputs changes, restarts osquery if needed +func (r *osqueryRunner) Run(parentCtx context.Context, runfn osqueryRunFunc) error { + var ( + flags osqd.Flags + + ctx context.Context + cn context.CancelFunc + wg sync.WaitGroup + + inputCh chan []config.InputConfig + ) + + var mx sync.Mutex + cancel := func() { + mx.Lock() + if cn != nil { + cn() + cn = nil + } + defer mx.Unlock() + } + + // Cleanup on exit + defer cancel() + + errCh := make(chan error, 1) + + process := func(inputs []config.InputConfig) { + newFlags := config.GetOsqueryOptions(inputs) + + // If Osqueryd is running and flags are different: stop osquery + if cn != nil && !osqd.FlagsAreSame(flags, newFlags) { + r.log.Info("Osquery is running and options changed, stop osqueryd") + + // Cancel context + cancel() + + // Wait until osquery runner exists + wg.Wait() + + // Set the flags to use + flags = newFlags + } + + // Start osqueryd if not running + if cn == nil { + r.log.Info("Start osqueryd") + inputCh = make(chan []config.InputConfig, 1) + ctx, cn = context.WithCancel(parentCtx) + + wg.Add(1) + go func() { + defer wg.Done() + err := runfn(ctx, flags, inputCh) + + // Reset cancellable + cancel() + + // Forward error to main loop + select { + case errCh <- err: + case <-ctx.Done(): + } + }() + } + + select { + case inputCh <- inputs: + case <-ctx.Done(): + } + } + + for { + select { + case inputs := <-r.inputCh: + r.log.Debug("Got configuration update") + process(inputs) + case err := <-errCh: + if err == nil || errors.Is(err, context.Canceled) { + r.log.Info("Osquery exited: ", err) + } else { + r.log.Error("Failed to run osquery:", err) + return err + } + case <-parentCtx.Done(): + return parentCtx.Err() + } + } +} + +func (r *osqueryRunner) Update(ctx context.Context, inputs []config.InputConfig) error { + select { + case <-ctx.Done(): + return ctx.Err() + case r.inputCh <- inputs: + } + return nil +} diff --git a/x-pack/osquerybeat/beater/osquery_runner_test.go b/x-pack/osquerybeat/beater/osquery_runner_test.go new file mode 100644 index 00000000000..5aed843869d --- /dev/null +++ b/x-pack/osquerybeat/beater/osquery_runner_test.go @@ -0,0 +1,95 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package beater + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/config" + "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/osqd" + "golang.org/x/sync/errgroup" +) + +func waitGroupWithTimeout(ctx context.Context, g *errgroup.Group, to time.Duration) error { + + errCh := make(chan error, 1) + + go func() { + err := g.Wait() + errCh <- err + }() + + ctx, cn := context.WithDeadline(ctx, time.Now().Add(to)) + defer cn() + + select { + case err := <-errCh: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +func waitForStart(ctx context.Context, runCh <-chan struct{}, to time.Duration) error { + ctx, cn := context.WithDeadline(ctx, time.Now().Add(to)) + defer cn() + + select { + case <-runCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func TestOsqueryRunnerCancellable(t *testing.T) { + to := 10 * time.Second + + parentCtx := context.Background() + logger := logp.NewLogger("osquery_runner") + + runCh := make(chan struct{}, 1) + + runfn := func(ctx context.Context, flags osqd.Flags, inputCh <-chan []config.InputConfig) error { + runCh <- struct{}{} + <-ctx.Done() + return nil + } + + ctx, cn := context.WithCancel(parentCtx) + defer cn() + + g, ctx := errgroup.WithContext(ctx) + + // Start runner + runner := newOsqueryRunner(logger) + g.Go(func() error { + return runner.Run(ctx, runfn) + }) + + // Sent input that will start the runner function + runner.Update(ctx, nil) + + // Wait for runner start + err := waitForStart(ctx, runCh, to) + if err != nil { + t.Fatal("failed starting:", err) + } + + // Cancel + go func() { + cn() + }() + + // Wait for runner stop + er := waitGroupWithTimeout(parentCtx, g, to) + if er != nil && !errors.Is(er, context.Canceled) { + t.Fatal("failed running:", er) + } +} diff --git a/x-pack/osquerybeat/beater/osquerybeat.go b/x-pack/osquerybeat/beater/osquerybeat.go index 5e5923bc909..2a658aefffd 100644 --- a/x-pack/osquerybeat/beater/osquerybeat.go +++ b/x-pack/osquerybeat/beater/osquerybeat.go @@ -151,32 +151,22 @@ func (bt *osquerybeat) Run(b *beat.Beat) error { } g, ctx := errgroup.WithContext(ctx) - var inputCh chan []config.InputConfig - - startOsqueryIfNotStarted := func() { - // Start only once - if inputCh == nil { - inputCh = make(chan []config.InputConfig, 1) - g.Go(func() error { - err := bt.runOsquery(ctx, b, osq, inputCh) - if err != nil { - if errors.Is(err, context.Canceled) { - bt.log.Errorf("Osquery exited: %v", err) - } else { - bt.log.Errorf("Failed to run osquery: %v", err) - } - } - return err - }) - } - } + + // Start osquery runner. + // It restarts osquery on configuration options change + // It exits if osqueryd fails to run for any reason, like a bad configuration for example + runner := newOsqueryRunner(bt.log) + g.Go(func() error { + return runner.Run(ctx, func(ctx context.Context, flags osqd.Flags, inputCh <-chan []config.InputConfig) error { + return bt.runOsquery(ctx, b, osq, flags, inputCh) + }) + }) // Start osquery only if config has inputs, otherwise it will be started on the first configuration sent from the agent // This way we don't need to persist the configuration for configuration plugin, because osquery is not running until // we have the first valid configuration if len(bt.config.Inputs) > 0 { - startOsqueryIfNotStarted() - inputCh <- bt.config.Inputs + runner.Update(ctx, bt.config.Inputs) } // Set the osquery beat version to the manager payload. This allows the bundled osquery version to be reported to the stack. @@ -184,7 +174,6 @@ func (bt *osquerybeat) Run(b *beat.Beat) error { // Run main loop g.Go(func() error { - // Configure publisher from initial input err := bt.pub.Configure(bt.config.Inputs) if err != nil { @@ -202,8 +191,7 @@ func (bt *osquerybeat) Run(b *beat.Beat) error { bt.log.Errorf("Failed to connect beat publisher client, err: %v", err) return err } - startOsqueryIfNotStarted() - inputCh <- inputConfigs + runner.Update(ctx, inputConfigs) } } }) @@ -212,7 +200,7 @@ func (bt *osquerybeat) Run(b *beat.Beat) error { return g.Wait() } -func (bt *osquerybeat) runOsquery(ctx context.Context, b *beat.Beat, osq *osqd.OSQueryD, inputCh <-chan []config.InputConfig) error { +func (bt *osquerybeat) runOsquery(ctx context.Context, b *beat.Beat, osq *osqd.OSQueryD, flags osqd.Flags, inputCh <-chan []config.InputConfig) error { socketPath := osq.SocketPath() // Create a cache for queries types resolution @@ -225,7 +213,7 @@ func (bt *osquerybeat) runOsquery(ctx context.Context, b *beat.Beat, osq *osqd.O // Start osqueryd g, ctx := errgroup.WithContext(ctx) g.Go(func() error { - err := osq.Run(ctx) + err := osq.Run(ctx, flags) if err != nil { if errors.Is(err, context.Canceled) { bt.log.Errorf("Osqueryd exited: %v", err) @@ -273,7 +261,7 @@ func (bt *osquerybeat) runOsquery(ctx context.Context, b *beat.Beat, osq *osqd.O }) // Register action handler - ah := bt.registerActionHandler(b, cli) + ah := bt.registerActionHandler(b, cli, configPlugin) defer bt.unregisterActionHandler(b, ah) // Process input @@ -315,12 +303,8 @@ func runExtensionServer(ctx context.Context, socketPath string, configPlugin *Co // Run extension server shutdown goroutine, otherwise it waits for ping failure g.Go(func() error { - for { - select { - case <-ctx.Done(): - return extserver.Shutdown(context.Background()) - } - } + <-ctx.Done() + return extserver.Shutdown(context.Background()) }) return g.Wait() @@ -341,7 +325,7 @@ func (bt *osquerybeat) handleSnapshotResult(ctx context.Context, cli *osqdcli.Cl return } - hits, err := cli.ResolveResult(ctx, qi.QueryConfig.Query, res.Hits) + hits, err := cli.ResolveResult(ctx, qi.Query, res.Hits) if err != nil { bt.log.Errorf("failed to resolve query result types: %s", res.Name) return @@ -364,7 +348,7 @@ func (bt *osquerybeat) Stop() { bt.close() } -func (bt *osquerybeat) registerActionHandler(b *beat.Beat, cli *osqdcli.Client) *actionHandler { +func (bt *osquerybeat) registerActionHandler(b *beat.Beat, cli *osqdcli.Client, configPlugin *ConfigPlugin) *actionHandler { if b.Manager == nil { return nil } @@ -374,6 +358,7 @@ func (bt *osquerybeat) registerActionHandler(b *beat.Beat, cli *osqdcli.Client) inputType: osqueryInputType, publisher: bt.pub, queryExec: cli, + np: configPlugin, } b.Manager.RegisterAction(ah) return ah diff --git a/x-pack/osquerybeat/internal/config/config.go b/x-pack/osquerybeat/internal/config/config.go index 3b972352a7a..c80d64603bc 100644 --- a/x-pack/osquerybeat/internal/config/config.go +++ b/x-pack/osquerybeat/internal/config/config.go @@ -40,11 +40,16 @@ type InputConfig struct { Name string `config:"name"` Type string `config:"type"` Datastream DatastreamConfig `config:"data_stream"` // Datastream configuration - Streams []StreamConfig `config:"streams"` Processors processors.PluginConfig `config:"processors"` - Platform string `config:"iplatform"` // restrict all queries to a given platform, default is 'all' platforms; you may use commas to set multiple platforms - Version string `config:"iversion"` // only run the queries with osquery versions greater than or equal-to this version string - Discovery []string `config:"discovery"` // a list of discovery queries https://osquery.readthedocs.io/en/stable/deployment/configuration/#discovery-queries + + // Full Osquery configuration + Osquery *OsqueryConfig `config:"osquery"` + + // Deprecated + Streams []StreamConfig `config:"streams"` + Platform string `config:"iplatform"` // restrict all queries to a given platform, default is 'all' platforms; you may use commas to set multiple platforms + Version string `config:"iversion"` // only run the queries with osquery versions greater than or equal-to this version string + Discovery []string `config:"discovery"` // a list of discovery queries https://osquery.readthedocs.io/en/stable/deployment/configuration/#discovery-queries } type Config struct { @@ -59,3 +64,14 @@ func Datastream(namespace string) string { } return datastreamPrefix + namespace } + +// GetOsqueryOptions Returns options from the first input if available +func GetOsqueryOptions(inputs []InputConfig) map[string]interface{} { + if len(inputs) == 0 { + return nil + } + if inputs[0].Osquery == nil { + return nil + } + return inputs[0].Osquery.Options +} diff --git a/x-pack/osquerybeat/internal/config/osquery.go b/x-pack/osquerybeat/internal/config/osquery.go new file mode 100644 index 00000000000..6db2ac0ca13 --- /dev/null +++ b/x-pack/osquerybeat/internal/config/osquery.go @@ -0,0 +1,72 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package config + +import ( + "encoding/json" +) + +type Query struct { + Query string `config:"query" json:"query"` + Interval int `config:"interval" json:"interval"` + Platform string `config:"platform" json:"platform,omitempty"` + Version string `config:"version" json:"version,omitempty"` + Shard int `config:"shard" json:"shard,omitempty"` + Description int `config:"description" json:"description,omitempty"` + + // Optional ECS mapping for the query, not rendered into osqueryd configuration + ECSMapping map[string]interface{} `config:"ecs_mapping" json:"-"` + + // Always enforced as snapshot, can't be changed via configuration + Snapshot bool `json:"snapshot"` +} + +type Pack struct { + Discovery []string `config:"discovery" json:"discovery,omitempty"` + Platform string `config:"platform" json:"platform,omitempty"` + Version string `config:"version" json:"version,omitempty"` + Shard int `config:"shard" json:"shard,omitempty"` + Queries map[string]Query `config:"queries" json:"queries,omitempty"` +} + +// > SELECT * FROM osquery_events where type = 'subscriber'; +// +---------------------+---------------------+------------+---------------+--------+-----------+--------+ +// | name | publisher | type | subscriptions | events | refreshes | active | +// +---------------------+---------------------+------------+---------------+--------+-----------+--------+ +// | apparmor_events | auditeventpublisher | subscriber | 0 | 0 | 0 | 0 | +// | bpf_process_events | BPFEventPublisher | subscriber | 0 | 0 | 0 | 0 | +// | bpf_socket_events | BPFEventPublisher | subscriber | 0 | 0 | 0 | 0 | +// | file_events | inotify | subscriber | 0 | 0 | 0 | 0 | +// | hardware_events | udev | subscriber | 0 | 0 | 0 | 0 | +// | process_events | auditeventpublisher | subscriber | 0 | 0 | 0 | 0 | +// | process_file_events | auditeventpublisher | subscriber | 0 | 0 | 0 | 0 | +// | seccomp_events | auditeventpublisher | subscriber | 0 | 0 | 0 | 0 | +// | selinux_events | auditeventpublisher | subscriber | 0 | 0 | 0 | 0 | +// | socket_events | auditeventpublisher | subscriber | 0 | 0 | 0 | 0 | +// | syslog_events | syslog | subscriber | 0 | 0 | 0 | 0 | +// | user_events | auditeventpublisher | subscriber | 0 | 0 | 0 | 0 | +// | yara_events | inotify | subscriber | 0 | 0 | 0 | 0 | +// +---------------------+---------------------+------------+---------------+--------+-----------+--------+ + +// The configuration supports a method to explicitly allow and deny events subscribers. +// If you choose to explicitly allow subscribers, then all will be disabled except for those specificied in the allow list. +// If you choose to explicitly deny subscribers, then all will be enabled except for those specificied in the deny list. +type Events struct { + EnableSubscribers []string `config:"enable_subscribers" json:"enable_subscribers,omitempty"` + DisableSubscribers []string `config:"disable_subscribers" json:"disable_subscribers,omitempty"` +} + +type OsqueryConfig struct { + Options map[string]interface{} `config:"options" json:"options,omitempty"` + Schedule map[string]Query `config:"schedule" json:"schedule,omitempty"` + Packs map[string]Pack `config:"packs" json:"packs,omitempty"` + Filepaths map[string][]string `config:"file_paths" json:"file_paths,omitempty"` + Views map[string]string `config:"views" json:"views,omitempty"` + Events *Events `config:"events" json:"events,omitempty"` +} + +func (c OsqueryConfig) Render() ([]byte, error) { + return json.MarshalIndent(c, "", " ") +} diff --git a/x-pack/osquerybeat/internal/osqd/args.go b/x-pack/osquerybeat/internal/osqd/args.go new file mode 100644 index 00000000000..4fdb30fef12 --- /dev/null +++ b/x-pack/osquerybeat/internal/osqd/args.go @@ -0,0 +1,101 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package osqd + +import ( + "fmt" + "reflect" + "strings" +) + +const ( + osqueryPid = "osquery.pid" + osqueryDb = "osquery.db" + osqueryAutoload = "osquery.autoload" + osqueryFlagfile = "osquery.flags" + + defaultExtensionsInterval = 3 + defaultExtensionsTimeout = 10 +) + +type Args []string +type Flags map[string]interface{} + +func (f Flags) GetString(key string) string { + if f == nil { + return "" + } + if v, ok := f[key]; ok { + if s, ok := v.(string); ok { + return s + } + } + return "" +} + +func FlagsAreSame(flags1, flags2 Flags) bool { + return reflect.DeepEqual(flags1, flags2) +} + +// Some flags combinations to enable events collection and audits +// // Enable events collection +// "--disable_events=false", +// // Begin: enable process events audit +// "--disable_audit=false", +// "--audit_allow_config=true", +// "--audit_persist=true", +// "--audit_allow_process_events=true", +// // End: enable process events audit + +// // Begin: enable sockets audit +// "--audit_allow_sockets=true", +// "--audit_allow_unix=true", // Allow domain sockets audit +// // End: enable sockets audit + +var protectedFlags = Flags{ + "force": true, + "disable_watchdog": true, + "utc": true, + + // Setting this value to 1 will auto-clear events whenever a SELECT is performed against the table, reducing all impact of the buffer. + "events_expiry": 1, + + // Extensions socket path + "extensions_socket": "", + "extensions_interval": defaultExtensionsInterval, + "extensions_timeout": defaultExtensionsTimeout, + + // Path dependendent keys + "pidfile": osqueryPid, + "database_path": osqueryDb, + "extensions_autoload": osqueryAutoload, + "flagfile": osqueryFlagfile, + + // Plugins + "config_plugin": "", + "logger_plugin": "", + + // The delimiter for a full query name that is concatenated as "pack_" + {{pack name}} + "_" + {{query name}} by default + "pack_delimiter": "_", + "config_refresh": 10, +} + +func convertToArgs(flags Flags) Args { + if flags == nil { + return nil + } + + sz := len(flags) + args := make([]string, 0, sz) + for k, v := range flags { + sval := fmt.Sprint(v) + // Appending args, skipping the values that contain space + if !strings.ContainsRune(sval, ' ') { + s := fmt.Sprint("--", k, "=", v) + args = append(args, s) + } + } + return args +} diff --git a/x-pack/osquerybeat/internal/osqd/args_test.go b/x-pack/osquerybeat/internal/osqd/args_test.go new file mode 100644 index 00000000000..fb63847d72b --- /dev/null +++ b/x-pack/osquerybeat/internal/osqd/args_test.go @@ -0,0 +1,52 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package osqd + +import ( + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestFlagsAreSame(t *testing.T) { + tests := []struct { + Name string + Flags1, Flags2 Flags + Expected bool + }{ + { + Name: "both nils", + Expected: true, + }, + { + Name: "first nil, second non", + Flags1: nil, + Flags2: Flags{ + "foo": "bar", + }, + Expected: false, + }, + { + Name: "same", + Flags1: Flags{ + "foo": "bar", + }, + Flags2: Flags{ + "foo": "bar", + }, + Expected: true, + }, + } + + for _, tc := range tests { + t.Run(tc.Name, func(t *testing.T) { + res := FlagsAreSame(tc.Flags1, tc.Flags2) + diff := cmp.Diff(tc.Expected, res) + if diff != "" { + t.Error(diff) + } + }) + } +} diff --git a/x-pack/osquerybeat/internal/osqd/osqueryd.go b/x-pack/osquerybeat/internal/osqd/osqueryd.go index 8ac67d826f6..7060dba9676 100644 --- a/x-pack/osquerybeat/internal/osqd/osqueryd.go +++ b/x-pack/osquerybeat/internal/osqd/osqueryd.go @@ -11,7 +11,6 @@ import ( "io/ioutil" "os" "os/exec" - "path" "path/filepath" "strings" "sync" @@ -25,13 +24,10 @@ import ( ) const ( - osqueryDName = "osqueryd" - osqueryAutoload = "osquery.autoload" - osqueryFlagfile = "osquery.flags" + osqueryDName = "osqueryd" ) const ( - defaultExtensionsTimeout = 10 defaultExitTimeout = 10 * time.Second defaultDataDir = "osquery" defaultConfigRefreshInterval = 30 // interval osqueryd will poll for configuration changed; scheduled queries configuration for now @@ -145,14 +141,14 @@ func (q *OSQueryD) Check(ctx context.Context) error { } // Run executes osqueryd binary as a child process -func (q *OSQueryD) Run(ctx context.Context) error { +func (q *OSQueryD) Run(ctx context.Context, flags Flags) error { cleanup, err := q.prepare(ctx) if err != nil { return err } defer cleanup() - cmd := q.createCommand() + cmd := q.createCommand(flags) q.log.Debugf("start osqueryd process: args: %v", cmd.Args) @@ -201,9 +197,7 @@ func (q *OSQueryD) Run(ctx context.Context) error { wg.Add(1) go func() { defer wg.Done() - select { - case finished <- wait(): - } + finished <- wait() }() select { @@ -266,7 +260,7 @@ func (q *OSQueryD) prepare(ctx context.Context) (func(), error) { } // Write the autoload file - extensionAutoloadPath := q.osqueryAutoloadPath() + extensionAutoloadPath := q.resolveDataPath(osqueryAutoload) if err := ioutil.WriteFile(extensionAutoloadPath, []byte(extensionPath), 0644); err != nil { return nil, errors.Wrap(err, "failed write osquery extension autoload file") } @@ -274,7 +268,7 @@ func (q *OSQueryD) prepare(ctx context.Context) (func(), error) { // Write the flagsfile in order to lock down/prevent loading default flags from osquery global locations. // Otherwise the osqueryi and osqueryd will try to load the default flags file, // for example from /var/osquery/osquery.flags.default on Mac, and can potentially mess up configuration of our osquery instance. - flagsfilePath := q.osqueryFlagfilePath() + flagsfilePath := q.resolveDataPath(osqueryFlagfile) exists, err := fileutil.FileExists(flagsfilePath) if err != nil { return nil, errors.Wrapf(err, "failed to check flagsfile path") @@ -302,59 +296,54 @@ func (q *OSQueryD) prepareBinPath() error { return nil } -func (q *OSQueryD) createCommand() *exec.Cmd { +func (q *OSQueryD) args(userFlags Flags) Args { + flags := make(Flags, len(userFlags)) - cmd := exec.Command( - osquerydPath(q.binPath), - "--force=true", - "--disable_watchdog", - "--utc", - // // Enable events collection - // "--disable_events=false", - // // Begin: enable process events audit - // "--disable_audit=false", - // "--audit_allow_config=true", - // "--audit_persist=true", - // "--audit_allow_process_events=true", - // // End: enable process events audit - - // // Begin: enable sockets audit - // "--audit_allow_sockets=true", - // "--audit_allow_unix=true", // Allow domain sockets audit - // // End: enable sockets audit - - // // Setting this value to 1 will auto-clear events whenever a SELECT is performed against the table, reducing all impact of the buffer. - // "--events_expiry=1", - - "--pidfile="+path.Join(q.dataPath, "osquery.pid"), - "--database_path="+path.Join(q.dataPath, "osquery.db"), - "--extensions_socket="+q.socketPath, - "--logger_path="+q.dataPath, - "--extensions_autoload="+q.osqueryAutoloadPath(), - "--flagfile="+q.osqueryFlagfilePath(), - "--extensions_interval=3", - fmt.Sprint("--extensions_timeout=", q.extensionsTimeout), - ) + // Copy user flags + for k, userValue := range userFlags { + flags[k] = userValue + } + + // Copy protected flags, protected keys overwrite the user keys + for k, v := range protectedFlags { + flags[k] = v + } + + flags["pidfile"] = q.resolveDataPath(flags.GetString("pidfile")) + flags["database_path"] = q.resolveDataPath(flags.GetString("database_path")) + flags["extensions_autoload"] = q.resolveDataPath(flags.GetString("extensions_autoload")) + flags["flagfile"] = q.resolveDataPath(flags.GetString("flagfile")) + + flags["extensions_socket"] = q.socketPath + + if q.extensionsTimeout > 0 { + flags["extensions_timeout"] = q.extensionsTimeout + + } if q.configPlugin != "" { - cmd.Args = append(cmd.Args, "--config_plugin="+q.configPlugin) + flags["config_plugin"] = q.configPlugin } if q.loggerPlugin != "" { - cmd.Args = append(cmd.Args, "--logger_plugin="+q.loggerPlugin) + flags["logger_plugin"] = q.loggerPlugin } if q.configRefreshInterval > 0 { - cmd.Args = append(cmd.Args, fmt.Sprintf("--config_refresh=%d", q.configRefreshInterval)) + flags["config_refresh"] = q.configRefreshInterval } - cmd.Args = append(cmd.Args, platformArgs()...) - if q.isVerbose() { - cmd.Args = append(cmd.Args, "--verbose") - cmd.Args = append(cmd.Args, "--disable_logging=false") + flags["verbose"] = true + flags["disable_logging"] = false } - return cmd + + return convertToArgs(flags) +} + +func (q *OSQueryD) createCommand(userFlags Flags) *exec.Cmd { + return exec.Command( + osquerydPath(q.binPath), q.args(userFlags)...) } func (q *OSQueryD) isVerbose() bool { @@ -369,12 +358,8 @@ func osqueryExtensionPath(dir string) string { return filepath.Join(dir, extensionName) } -func (q *OSQueryD) osqueryAutoloadPath() string { - return filepath.Join(q.dataPath, osqueryAutoload) -} - -func (q *OSQueryD) osqueryFlagfilePath() string { - return filepath.Join(q.dataPath, osqueryFlagfile) +func (q *OSQueryD) resolveDataPath(filename string) string { + return filepath.Join(q.dataPath, filename) } func (q *OSQueryD) logOSQueryOutput(ctx context.Context, r io.ReadCloser) error {