Skip to content

Commit

Permalink
Osquerybeat: Set elastic_agent on the output data (#25512) (#25535)
Browse files Browse the repository at this point in the history
* Set elastic_agent on the output data

* Address code review comments

(cherry picked from commit 7a6ad95)

Co-authored-by: Aleksandr Maus <[email protected]>
  • Loading branch information
mergify[bot] and aleksmaus authored May 5, 2021
1 parent dbb35e9 commit 052b8dc
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 14 deletions.
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/program/supported.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions x-pack/elastic-agent/spec/osquerybeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ rules:
values:
- osquery

- inject_agent_info: {}

- filter:
selectors:
- inputs
Expand Down
98 changes: 87 additions & 11 deletions x-pack/osquerybeat/beater/osquerybeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/processors"

"github.com/elastic/beats/v7/x-pack/osquerybeat/internal/config"
"github.com/elastic/beats/v7/x-pack/osquerybeat/internal/distro"
Expand Down Expand Up @@ -159,7 +160,7 @@ func (bt *osquerybeat) Run(b *beat.Beat) error {
}

// Connect publisher
bt.client, err = b.Publisher.Connect()
processors, err := bt.reconnectPublisher(b, bt.config.Inputs)
if err != nil {
return err
}
Expand Down Expand Up @@ -201,7 +202,9 @@ func (bt *osquerybeat) Run(b *beat.Beat) error {
}

// Start queries execution scheduler
scheduler := NewScheduler(ctx, bt.query)
schedCtx, schedCancel := context.WithCancel(ctx)
scheduler := NewScheduler(schedCtx, bt.query)
defer schedCancel()
wg.Add(1)
go func() {
defer wg.Done()
Expand Down Expand Up @@ -251,12 +254,26 @@ func (bt *osquerybeat) Run(b *beat.Beat) error {
}
}

setManagerPayload := func(itypes []string) {
if b.Manager != nil {
b.Manager.SetPayload(map[string]interface{}{
"osquery_version": distro.OsquerydVersion(),
})
handleInputConfig := func(inputConfigs []config.InputConfig) error {
bt.log.Debug("Handle input configuration change")
// Only set processor if it was not set before
if processors == nil {
bt.log.Debug("Set processors for the first time")
var err error
processors, err = bt.reconnectPublisher(b, inputConfigs)
if err != nil {
bt.log.Errorf("Failed to connect beat publisher client, err: %v", err)
return err
}
} else {
bt.log.Debug("Processors are already set")
}

streams, inputTypes = config.StreamsFromInputs(inputConfigs)
registerActionHandlers(inputTypes)
bt.setManagerPayload(b)
loadSchedulerStreams(streams)
return nil
}

LOOP:
Expand All @@ -270,22 +287,79 @@ LOOP:
bt.log.Infof("Exited osqueryd process, error: %v", exitErr)
break LOOP
case inputConfigs := <-inputConfigCh:
streams, inputTypes = config.StreamsFromInputs(inputConfigs)
registerActionHandlers(inputTypes)
setManagerPayload(inputTypes)
loadSchedulerStreams(streams)
err = handleInputConfig(inputConfigs)
if err != nil {
bt.log.Errorf("Failed to handle input configuration, err: %v, exiting", err)
// Cancel scheduler
schedCancel()
break LOOP
}
}
}

// Unregister action handlers
unregisterActionHandlers()

// Wait for clean scheduler exit
bt.log.Debug("Wait clean beat run exit")
wg.Wait()
bt.log.Debug("Beat run exited, err: %v", err)

return err
}

func (bt *osquerybeat) setManagerPayload(b *beat.Beat) {
if b.Manager != nil {
b.Manager.SetPayload(map[string]interface{}{
"osquery_version": distro.OsquerydVersion(),
})
}
}

func (bt *osquerybeat) reconnectPublisher(b *beat.Beat, inputs []config.InputConfig) (*processors.Processors, error) {
processors, err := processorsForInputsConfig(inputs)
if err != nil {
return nil, err
}

bt.log.Debugf("Connect publisher with processors: %d", len(processors.All()))
// Connect publisher
client, err := b.Publisher.ConnectWith(beat.ClientConfig{
Processing: beat.ProcessingConfig{
Processor: processors,
},
})
if err != nil {
return nil, err
}

// Swap client
bt.mx.Lock()
defer bt.mx.Unlock()
oldclient := bt.client
bt.client = client
if oldclient != nil {
oldclient.Close()
}
return processors, nil
}

func processorsForInputsConfig(inputs []config.InputConfig) (procs *processors.Processors, err error) {
// Use only first input processor
// Every input will have a processor that adds the elastic_agent info, we need only one
// Not expecting other processors at the moment and this needs to work for 7.13
for _, input := range inputs {
if len(input.Processors) > 0 {
procs, err = processors.New(input.Processors)
if err != nil {
return nil, err
}
return procs, nil
}
}
return nil, nil
}

// Stop stops osquerybeat.
func (bt *osquerybeat) Stop() {
bt.close()
Expand Down Expand Up @@ -356,6 +430,8 @@ func (bt *osquerybeat) executeQuery(ctx context.Context, log *logp.Logger, index
return err
}

bt.mx.Lock()
defer bt.mx.Unlock()
for _, hit := range hits {
reqData := req["data"]
event := beat.Event{
Expand Down
7 changes: 5 additions & 2 deletions x-pack/osquerybeat/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package config

import (
"time"

"github.com/elastic/beats/v7/libbeat/processors"
)

// Default index name for ad-hoc queries, since the dataset is defined at the stream level, for example:
Expand All @@ -29,8 +31,9 @@ type StreamConfig struct {
}

type InputConfig struct {
Type string `config:"type"`
Streams []StreamConfig `config:"streams"`
Type string `config:"type"`
Streams []StreamConfig `config:"streams"`
Processors processors.PluginConfig `config:"processors"`
}

type Config struct {
Expand Down

0 comments on commit 052b8dc

Please sign in to comment.