Skip to content

Commit

Permalink
Osquerybeat: Support custom namespaces for scheduled queries (#27479)
Browse files Browse the repository at this point in the history
* Osquerybeat: Support custom namespaces for scheduled queries

* Some unit test coverage for the input namespaces

* Update namespaces resolution: they should be immeditelly avaiable while the queries info should update only upon the configuration poll from osqueryd
  • Loading branch information
aleksmaus authored Aug 19, 2021
1 parent 4b442ce commit bbf7a58
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 41 deletions.
2 changes: 1 addition & 1 deletion x-pack/osquerybeat/beater/action_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (a *actionHandler) execute(ctx context.Context, req map[string]interface{})
if err != nil {
return fmt.Errorf("%v: %w", err, ErrQueryExecution)
}
return a.executeQuery(ctx, config.DefaultStreamIndex, ad, "", req)
return a.executeQuery(ctx, config.Datastream(config.DefaultNamespace), ad, "", req)
}

func (a *actionHandler) executeQuery(ctx context.Context, index string, ad actionData, responseID string, req map[string]interface{}) error {
Expand Down
65 changes: 48 additions & 17 deletions x-pack/osquerybeat/beater/config_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,35 @@ var (
ErrECSMappingIsTooDeep = errors.New("ECS mapping is too deep")
)

type QueryInfo struct {
QueryConfig query
ECSMapping ecs.Mapping
}

type queryInfoMap map[string]QueryInfo

type ConfigPlugin struct {
log *logp.Logger

mx sync.RWMutex

queriesCount int

// A map that allows to look up the original query by name for the column types resolution
queryMap map[string]query
// A map that allows to look up the queryInfo by query name
queryInfoMap queryInfoMap

// A map for the query ECS mapping lookups
ecsMap map[string]ecs.Mapping
// This map holds the new queries info before the configuration requested from the plugin.
// This replaces the queryInfoMap upon receiving GenerateConfig call from osqueryd.
// Until we receive this call from osqueryd we should use the previously set mapping,
// otherwise we potentially could receive the query result for the old queries before osqueryd requested the new configuration
// and we would not be able to resolve types or ECS mapping or the namespace.
newQueryInfoMap queryInfoMap

// Datastream namesapces map that allows to lookup the namespace per query.
// The datastream namespaces map is handled separatelly from query info
// because if we delay updating it until the osqueryd config refresh (up to 1 minute, the way we do with queryinfo)
// we could be sending data into the datastream with namespace that we don't have permissions meanwhile
namespaces map[string]string

// Packs
packs map[string]pack
Expand All @@ -53,7 +70,8 @@ type ConfigPlugin struct {

func NewConfigPlugin(log *logp.Logger) *ConfigPlugin {
p := &ConfigPlugin{
log: log.With("ctx", "config"),
log: log.With("ctx", "config"),
queryInfoMap: make(queryInfoMap),
}

return p
Expand All @@ -70,19 +88,18 @@ func (p *ConfigPlugin) Count() int {
return p.queriesCount
}

func (p *ConfigPlugin) ResolveName(name string) (sql string, ok bool) {
func (p *ConfigPlugin) LookupQueryInfo(name string) (qi QueryInfo, ok bool) {
p.mx.RLock()
defer p.mx.RUnlock()
sc, ok := p.queryMap[name]

return sc.Query, ok
qi, ok = p.queryInfoMap[name]
return qi, ok
}

func (p *ConfigPlugin) LookupECSMapping(name string) (m ecs.Mapping, ok bool) {
func (p *ConfigPlugin) LookupNamespace(name string) (ns string, ok bool) {
p.mx.RLock()
defer p.mx.RUnlock()
m, ok = p.ecsMap[name]
return m, ok
ns, ok = p.namespaces[name]
return ns, ok
}

func (p *ConfigPlugin) GenerateConfig(ctx context.Context) (map[string]string, error) {
Expand All @@ -96,6 +113,12 @@ func (p *ConfigPlugin) GenerateConfig(ctx context.Context) (map[string]string, e
return nil, err
}

// replace the query info map
if p.newQueryInfoMap != nil {
p.queryInfoMap = p.newQueryInfoMap
p.newQueryInfoMap = nil
}

return map[string]string{
configName: c,
}, nil
Expand Down Expand Up @@ -147,10 +170,12 @@ func (p *ConfigPlugin) render() (string, error) {
}

func (p *ConfigPlugin) set(inputs []config.InputConfig) error {
var err error

p.configString = ""
queriesCount := 0
p.queryMap = make(map[string]query)
p.ecsMap = make(map[string]ecs.Mapping)
newQueryInfoMap := make(map[string]QueryInfo)
namespaces := make(map[string]string)
p.packs = make(map[string]pack)
for _, input := range inputs {
pack := pack{
Expand All @@ -168,19 +193,25 @@ func (p *ConfigPlugin) set(inputs []config.InputConfig) error {
Version: stream.Version,
Snapshot: true, // enforce snapshot for all queries
}
p.queryMap[id] = query
var ecsm ecs.Mapping
if len(stream.ECSMapping) > 0 {
ecsm, err := flattenECSMapping(stream.ECSMapping)
ecsm, err = flattenECSMapping(stream.ECSMapping)
if err != nil {
return err
}
p.ecsMap[id] = ecsm
}
newQueryInfoMap[id] = QueryInfo{
QueryConfig: query,
ECSMapping: ecsm,
}
namespaces[id] = input.Datastream.Namespace
pack.Queries[stream.ID] = query
queriesCount++
}
p.packs[input.Name] = pack
}
p.newQueryInfoMap = newQueryInfoMap
p.namespaces = namespaces
p.queriesCount = queriesCount
return nil
}
Expand Down
57 changes: 47 additions & 10 deletions x-pack/osquerybeat/beater/config_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,11 @@ func TestSet(t *testing.T) {
}`
oneInputConfig := []config.InputConfig{
{
Name: "osquery-manager-1",
Type: "osquery",
Name: "osquery-manager-1",
Type: "osquery",
Datastream: config.DatastreamConfig{
Namespace: "custom",
},
Platform: "posix",
Version: "4.7.0",
Discovery: []string{
Expand Down Expand Up @@ -327,6 +330,30 @@ func TestSet(t *testing.T) {
t.Fatal(diff)
}

// Should not resolve the query until the config was generated
if tc.name == "one input" {
_, ok := cfgp.LookupQueryInfo("users")
diff = cmp.Diff(false, ok)
if diff != "" {
t.Fatal(diff)
}

// Check the namespaces set before configuration is generated
for _, input := range tc.inputs {
_, ok := cfgp.LookupNamespace("users")
diff = cmp.Diff(false, ok)
if diff != "" {
t.Fatal(diff)
}

diff = cmp.Diff(oneInputConfig[0].Datastream.Namespace, input.Datastream.Namespace)
if diff != "" {
t.Fatal(diff)
}

}
}

// test generate config
mcfg, err := cfgp.GenerateConfig(context.Background())
if err != nil {
Expand Down Expand Up @@ -354,29 +381,39 @@ func TestSet(t *testing.T) {
for _, input := range tc.inputs {
for _, stream := range input.Streams {
name := strings.Join([]string{"pack", input.Name, stream.ID}, "_")
sql, ok := cfgp.ResolveName(name)

ns, ok := cfgp.LookupNamespace(name)
if !ok {
t.Fatalf("failed to resolve namespace for %v", name)
}

qi, ok := cfgp.LookupQueryInfo(name)
if !ok {
t.Fatalf("failed to resolve name %v", name)
}
diff = cmp.Diff(sql, stream.Query)
diff = cmp.Diff(qi.QueryConfig.Query, stream.Query)
if diff != "" {
t.Error(diff)
}

diff = cmp.Diff(input.Datastream.Namespace, ns)
if diff != "" {
t.Error(diff)
}

if len(stream.ECSMapping) == 0 {
continue
}

// test that the query ecs mapping lookup succeeds
ecsm, ok := cfgp.LookupECSMapping(name)
if !ok {
t.Fatalf("failed to lookup ecs mapping for %v", name)
diff = cmp.Diff(tc.ecsm, qi.ECSMapping)
if diff != "" {
t.Error(diff)
}
diff = cmp.Diff(tc.ecsm, ecsm)
}
}

// test that unknown query can't be resolved
_, ok = cfgp.ResolveName("unknown query name")
_, ok = cfgp.LookupQueryInfo("unknown query name")
if ok {
t.Fatalf("unexpectedly resolved unknown query")
}
Expand Down
37 changes: 26 additions & 11 deletions x-pack/osquerybeat/beater/osquerybeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (bt *osquerybeat) Run(b *beat.Beat) error {
defer bt.close()

// Watch input configuration updates
inputConfigCh := config.WatchInputs(ctx)
inputConfigCh := config.WatchInputs(ctx, bt.log)

// Install osqueryd if needed
err = installOsquery(ctx)
Expand Down Expand Up @@ -164,7 +164,11 @@ func (bt *osquerybeat) Run(b *beat.Beat) error {
g.Go(func() error {
err := bt.runOsquery(ctx, b, osq, inputCh)
if err != nil {
bt.log.Errorf("Failed to run osqueryd: %v", err)
if errors.Is(err, context.Canceled) {
bt.log.Errorf("Osquery exited: %v", err)
} else {
bt.log.Errorf("Failed to run osquery: %v", err)
}
}
return err
})
Expand Down Expand Up @@ -234,7 +238,11 @@ func (bt *osquerybeat) runOsquery(ctx context.Context, b *beat.Beat, osq *osqd.O
g.Go(func() error {
err := osq.Run(ctx)
if err != nil {
bt.log.Errorf("Failed to run osqueryd: %v", err)
if errors.Is(err, context.Canceled) {
bt.log.Errorf("Osqueryd exited: %v", err)
} else {
bt.log.Errorf("Failed to run osqueryd: %v", err)
}
} else {
// When osqueryd is killed for example there is no error returned
// but we can't continue running. Exiting.
Expand Down Expand Up @@ -330,33 +338,40 @@ func runExtensionServer(ctx context.Context, socketPath string, configPlugin *Co
}

func (bt *osquerybeat) handleSnapshotResult(ctx context.Context, cli *osqdcli.Client, configPlugin *ConfigPlugin, res SnapshotResult) {
sql, ok := configPlugin.ResolveName(res.Name)
ns, ok := configPlugin.LookupNamespace(res.Name)
if !ok {
bt.log.Debugf("failed to lookup query namespace: %s, the query was possibly removed recently from the schedule", res.Name)
// Drop the scheduled query results since at this point we don't have the namespace for the datastream where to send the results to
// and the API key would not have permissions for that namespaces datastream to create the index
return
}

qi, ok := configPlugin.LookupQueryInfo(res.Name)
if !ok {
bt.log.Errorf("failed to resolve query name: %s", res.Name)
bt.log.Errorf("failed to lookup query info: %s", res.Name)
return
}

hits, err := cli.ResolveResult(ctx, sql, res.Hits)
hits, err := cli.ResolveResult(ctx, qi.QueryConfig.Query, res.Hits)
if err != nil {
bt.log.Errorf("failed to resolve query types: %s", res.Name)
bt.log.Errorf("failed to resolve query result types: %s", res.Name)
return
}

// Map to ECS
var ecsFields []common.MapStr
mapping, ok := configPlugin.LookupECSMapping(res.Name)
if ok && len(mapping) > 0 {
if ok && len(qi.ECSMapping) > 0 {
ecsFields = make([]common.MapStr, len(hits))
for i, hit := range hits {
ecsFields[i] = common.MapStr(mapping.Map(hit))
ecsFields[i] = common.MapStr(qi.ECSMapping.Map(hit))
}
} else {
// ECS mapping is optional, continue
bt.log.Debugf("ECS mapping is not found for query name: %s", res.Name)
}

responseID := uuid.Must(uuid.NewV4()).String()
bt.publishEvents(config.DefaultStreamIndex, res.Name, responseID, hits, ecsFields, nil)
bt.publishEvents(config.Datastream(ns), res.Name, responseID, hits, ecsFields, nil)
}

func (bt *osquerybeat) setManagerPayload(b *beat.Beat) {
Expand Down
16 changes: 15 additions & 1 deletion x-pack/osquerybeat/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
// type: logs
// query: select * from usb_devices

const DefaultStreamIndex = "logs-osquery_manager.result-default"
const DefaultNamespace = "default"

const datastreamPrefix = "logs-osquery_manager.result-"

type StreamConfig struct {
ID string `config:"id"`
Expand All @@ -30,9 +32,14 @@ type StreamConfig struct {
ECSMapping map[string]interface{} `config:"ecs_mapping"` // ECS mapping definition where the key is the source field in osquery result and the value is the destination fields in ECS
}

type DatastreamConfig struct {
Namespace string `config:"namespace"`
}

type InputConfig struct {
Name string `config:"name"`
Type string `config:"type"`
Datastream DatastreamConfig `config:"data_stream"` // Datastream configuration
Streams []StreamConfig `config:"streams"`
Processors processors.PluginConfig `config:"processors"`
Platform string `config:"iplatform"` // restrict all queries to a given platform, default is 'all' platforms; you may use commas to set multiple platforms
Expand All @@ -45,3 +52,10 @@ type Config struct {
}

var DefaultConfig = Config{}

func Datastream(namespace string) string {
if namespace == "" {
namespace = DefaultNamespace
}
return datastreamPrefix + namespace
}
Loading

0 comments on commit bbf7a58

Please sign in to comment.