Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest Management] Agent supports capabilities definition #23848

Merged
merged 40 commits into from
Feb 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
e364446
base
michalpristas Jan 26, 2021
51ff837
test
michalpristas Jan 26, 2021
c78b8fe
yaml parsing
michalpristas Jan 26, 2021
5861d56
interim
michalpristas Jan 26, 2021
e5c76d8
Merge branch 'master' of github.com:elastic/beats into agent-21000
michalpristas Jan 27, 2021
d4f938d
rename
michalpristas Jan 27, 2021
cd7468f
upgrade logic done
michalpristas Jan 27, 2021
c21a7c7
expr
michalpristas Jan 27, 2021
c1d244b
input in progress
michalpristas Jan 28, 2021
33f6329
conditions
michalpristas Jan 28, 2021
595c253
multi inputs
michalpristas Jan 28, 2021
716c4d6
ast to map, saving 80perc allocations
michalpristas Jan 29, 2021
8667e67
output
michalpristas Jan 29, 2021
3214f2e
logging
michalpristas Jan 29, 2021
8ec47a0
load form file
michalpristas Jan 29, 2021
03acaea
Merge branch 'master' into agent-21000
michalpristas Feb 2, 2021
faacd96
prepare tests of load
michalpristas Feb 2, 2021
2472827
tests load
michalpristas Feb 3, 2021
0124574
tests load
michalpristas Feb 3, 2021
8882956
injecting
michalpristas Feb 3, 2021
29f3413
reporter in
michalpristas Feb 3, 2021
b094755
refinement
michalpristas Feb 3, 2021
7fa392f
unhealthy on filter
michalpristas Feb 4, 2021
0037255
Merge branch 'master' of github.com:elastic/beats into agent-21000
michalpristas Feb 4, 2021
d53b4eb
gitignore revert
michalpristas Feb 4, 2021
c98252a
changelog
michalpristas Feb 4, 2021
f3d0bc8
spelling
michalpristas Feb 4, 2021
4edee85
test fix
michalpristas Feb 4, 2021
a9e71d9
mage fmt
michalpristas Feb 4, 2021
71e3492
Merge branch 'master' of github.com:elastic/beats into agent-21000
michalpristas Feb 5, 2021
4e23669
subtree
michalpristas Feb 5, 2021
5b0bdca
Merge branch 'master' of github.com:elastic/beats into agent-21000
michalpristas Feb 5, 2021
75dc879
Merge branch 'master' of github.com:elastic/beats into agent-21000
michalpristas Feb 5, 2021
9a337b9
Merge branch 'master' of github.com:elastic/beats into agent-21000
michalpristas Feb 8, 2021
58a04ce
small refactor
michalpristas Feb 9, 2021
41443f9
Merge branch 'master' of github.com:elastic/beats into agent-21000
michalpristas Feb 9, 2021
ea33dff
lint
michalpristas Feb 9, 2021
02c88d8
comment typo
michalpristas Feb 9, 2021
6aa4ca0
Merge branch 'master' of github.com:elastic/beats into agent-21000
michalpristas Feb 12, 2021
c18b5cd
Merge branch 'master' of github.com:elastic/beats into agent-21000
michalpristas Feb 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
- Add --staging option to enroll command {pull}20026[20026]
- Add `event.dataset` to all events {pull}20076[20076]
- Send datastreams fields {pull}20416[20416]
- Agent supports capabilities definition {pull}23848[23848]

[[release-notes-7.8.0]]
=== Elastic Agent version 7.8.0
Expand Down
19 changes: 18 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
Expand Down Expand Up @@ -42,6 +43,7 @@ type emitterController struct {
router programsDispatcher
modifiers *configModifiers
reloadables []reloadable
caps capabilities.Capability

// state
lock sync.RWMutex
Expand All @@ -65,6 +67,20 @@ func (e *emitterController) Update(c *config.Config) error {
if err != nil {
return errors.New(err, "could not create the AST from the configuration", errors.TypeConfig)
}

if e.caps != nil {
var ok bool
updatedAst, err := e.caps.Apply(rawAst)
if err != nil {
return errors.New(err, "failed to apply capabilities")
}

rawAst, ok = updatedAst.(*transpiler.AST)
if !ok {
return errors.New("failed to transform object returned from capabilities to AST", errors.TypeConfig)
}
}

for _, filter := range e.modifiers.Filters {
if err := filter(e.logger, rawAst); err != nil {
return errors.New(err, "failed to filter configuration", errors.TypeConfig)
Expand Down Expand Up @@ -142,7 +158,7 @@ func (e *emitterController) update() error {
return e.router.Dispatch(ast.HashStr(), programsToRun)
}

func emitter(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, controller composable.Controller, router programsDispatcher, modifiers *configModifiers, reloadables ...reloadable) (emitterFunc, error) {
func emitter(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, controller composable.Controller, router programsDispatcher, modifiers *configModifiers, caps capabilities.Capability, reloadables ...reloadable) (emitterFunc, error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The number of parameters gets a little out of hand. Do we really need this function, or can we just use the struct and have a separate "init" or "start" function/method?

Do we pass a tuple of ctx, log, agentInfo often? In the future we might even want to pass something for metrics and APM tracing. The 'tuple' could be combined into an appContext struct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm +1 on this here. would be worth a small refactor across all the codebase, we have this pattern i think on multple places. these things were simple at first but got bloated over time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The emitter function is actually become more of a wrapper for the emitterController with the addition of dynamic inputs and conditions.

We should probably refactor that into a single controller.

log.Debugf("Supported programs: %s", strings.Join(program.KnownProgramNames(), ", "))

init, _ := transpiler.NewVars(map[string]interface{}{})
Expand All @@ -154,6 +170,7 @@ func emitter(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo,
modifiers: modifiers,
reloadables: reloadables,
vars: []*transpiler.Vars{init},
caps: caps,
}
err := controller.Run(ctx, func(vars []*transpiler.Vars) {
ctrl.Set(vars)
Expand Down
6 changes: 6 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/info/agent_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

// defaultAgentConfigFile is a name of file used to store agent information
const defaultAgentCapabilitiesFile = "capabilities.yml"
const defaultAgentConfigFile = "fleet.yml"
const agentInfoKey = "agent"

Expand All @@ -44,6 +45,11 @@ func AgentConfigFile() string {
return filepath.Join(paths.Config(), defaultAgentConfigFile)
}

// AgentCapabilitiesPath is a name of file used to store agent capabilities
func AgentCapabilitiesPath() string {
return filepath.Join(paths.Config(), defaultAgentCapabilitiesFile)
}

// AgentActionStoreFile is the file that contains the action that can be replayed after restart.
func AgentActionStoreFile() string {
return filepath.Join(paths.Home(), defaultAgentActionStoreFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
)

Expand Down Expand Up @@ -118,7 +120,25 @@ func loadFleetConfig(cfg *config.Config) (map[string]interface{}, error) {
}

func printMapStringConfig(mapStr map[string]interface{}) error {
data, err := yaml.Marshal(mapStr)
l, err := newErrorLogger()
if err != nil {
return err
}
caps, err := capabilities.Load(info.AgentCapabilitiesPath(), l, status.NewController(l))
if err != nil {
return err
}

newCfg, err := caps.Apply(mapStr)
if err != nil {
return errors.New(err, "failed to apply capabilities")
}
newMap, ok := newCfg.(map[string]interface{})
if !ok {
return errors.New("config returned from capabilities has invalid type")
}

data, err := yaml.Marshal(newMap)
if err != nil {
return errors.New(err, "could not marshal to YAML")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/noop"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status"
)

// InspectOutputCmd is an inspect subcommand that shows configurations of the agent.
Expand Down Expand Up @@ -207,13 +209,19 @@ func getProgramsFromConfig(log *logger.Logger, agentInfo *info.AgentInfo, cfg *c
modifiers.Filters = append(modifiers.Filters, injectFleet(cfg, sysInfo.Info(), agentInfo))
}

caps, err := capabilities.Load(info.AgentCapabilitiesPath(), log, status.NewController(log))
if err != nil {
return nil, err
}

emit, err := emitter(
ctx,
log,
agentInfo,
composableWaiter,
router,
modifiers,
caps,
monitor,
)
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
Expand Down Expand Up @@ -66,6 +67,11 @@ func newLocal(
agentInfo *info.AgentInfo,
) (*Local, error) {
statusController := &noopController{}
caps, err := capabilities.Load(info.AgentCapabilitiesPath(), log, statusController)
if err != nil {
return nil, err
}

cfg, err := configuration.NewFromConfig(rawConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -120,6 +126,7 @@ func newLocal(
Decorators: []decoratorFunc{injectMonitoring},
Filters: []filterFunc{filters.StreamChecker},
},
caps,
monitor,
)
if err != nil {
Expand All @@ -145,7 +152,8 @@ func newLocal(
[]context.CancelFunc{localApplication.cancelCtxFn},
reexec,
newNoopAcker(),
reporter)
reporter,
caps)
uc.SetUpgrader(upgrader)

return localApplication, nil
Expand Down
10 changes: 9 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
Expand Down Expand Up @@ -66,6 +67,11 @@ func newManaged(
agentInfo *info.AgentInfo,
) (*Managed, error) {
statusController := status.NewController(log)
caps, err := capabilities.Load(info.AgentCapabilitiesPath(), log, statusController)
if err != nil {
return nil, err
}

path := info.AgentConfigFile()

store := storage.NewDiskStore(path)
Expand Down Expand Up @@ -173,6 +179,7 @@ func newManaged(
Decorators: []decoratorFunc{injectMonitoring},
Filters: []filterFunc{filters.StreamChecker, injectFleet(config, sysInfo.Info(), agentInfo)},
},
caps,
monitor,
)
if err != nil {
Expand Down Expand Up @@ -205,7 +212,8 @@ func newManaged(
[]context.CancelFunc{managedApplication.cancelCtxFn},
reexec,
acker,
combinedReporter)
combinedReporter,
caps)

policyChanger := &handlerPolicyChange{
log: log,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestManagedModeRouting(t *testing.T) {
agentInfo, _ := info.NewAgentInfo()
nullStore := &storage.NullStore{}
composableCtrl, _ := composable.New(log, nil)
emit, err := emitter(ctx, log, agentInfo, composableCtrl, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}})
emit, err := emitter(ctx, log, agentInfo, composableCtrl, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}}, nil)
require.NoError(t, err)

actionDispatcher, err := newActionDispatcher(ctx, log, &handlerDefault{log: log})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ import (
type noopController struct{}

func (*noopController) Register(_ string) status.Reporter { return &noopReporter{} }
func (*noopController) Status() status.AgentStatus { return status.Healthy }
func (*noopController) UpdateStateID(_ string) {}
func (*noopController) StatusString() string { return "online" }
func (*noopController) RegisterWithPersistance(_ string, _ bool) status.Reporter {
return &noopReporter{}
}
func (*noopController) Status() status.AgentStatus { return status.Healthy }
func (*noopController) UpdateStateID(_ string) {}
func (*noopController) StatusString() string { return "online" }

type noopReporter struct{}

Expand Down
11 changes: 10 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/install"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
Expand Down Expand Up @@ -48,6 +49,7 @@ type Upgrader struct {
acker acker
reporter stateReporter
upgradeable bool
caps capabilities.Capability
}

// Action is the upgrade action state.
Expand Down Expand Up @@ -81,7 +83,7 @@ func IsUpgradeable() bool {
}

// NewUpgrader creates an upgrader which is capable of performing upgrade operation
func NewUpgrader(agentInfo *info.AgentInfo, settings *artifact.Config, log *logger.Logger, closers []context.CancelFunc, reexec reexecManager, a acker, r stateReporter) *Upgrader {
func NewUpgrader(agentInfo *info.AgentInfo, settings *artifact.Config, log *logger.Logger, closers []context.CancelFunc, reexec reexecManager, a acker, r stateReporter, caps capabilities.Capability) *Upgrader {
return &Upgrader{
agentInfo: agentInfo,
settings: settings,
Expand All @@ -91,6 +93,7 @@ func NewUpgrader(agentInfo *info.AgentInfo, settings *artifact.Config, log *logg
acker: a,
reporter: r,
upgradeable: IsUpgradeable(),
caps: caps,
}
}

Expand All @@ -116,6 +119,12 @@ func (u *Upgrader) Upgrade(ctx context.Context, a Action, reexecNow bool) (err e
"running under control of the systems supervisor")
}

if u.caps != nil {
if _, err := u.caps.Apply(a); err == capabilities.ErrBlocked {
return nil
}
}

u.reportUpdating(a.Version())

sourceURI, err := u.sourceURI(a.Version(), a.SourceURI())
Expand Down
97 changes: 97 additions & 0 deletions x-pack/elastic-agent/pkg/capabilities/capabilities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package capabilities

import (
"errors"
"os"

"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status"
)

// Capability provides a way of applying predefined filter to object.
// It's up to capability to determine if capability is applicable on object.
type Capability interface {
// Apply applies capabilities on input and returns true if input should be completely blocked
// otherwise, false and updated input is returned
Apply(interface{}) (interface{}, error)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still would say the interface is doing to much, while not really abstracting things.

The input and output types are interface{}, that is the internal representation of the objects are not really abstracted away behind coherent interfaces. Instead we rely on internal knowledge of the potential structures to be passed from different call sides. Due to the need to downcast and analyze the internal structure, we fail to have proper abstractions here. Instead we allow for potential bugs being introduced in the future in case we change the internal representation for e.g. Inputs to a proper go-type without finding and updating the right capabilities.

The Apply method indicates that we are doing a transformation. Yet we only want to filter. Would it make sense to change the interface to act more like a predicate and have the code to move through the configured structures in some other central place?
My understanding is that we have the 'walking' on the structure already in the transpiler, why should we replicate the walking in the capabilities?

E.g.

type Capability interface {
  // TestFeature returns nil if the kind and typeName are accepted by the capability.
  // ErrDeny is returned if the capability rejects the current feature.
  TestFeature(kind ComponentKind, typeName string) (err error)
}

var ErrDeny = errors.New("deny")

type ComponentKind int

const (
  ComponentInput ComponentKind = iota
  ComponentOutput
)

Alternatively (in case we want transformations), we might want to pass the root configuration object to Apply. In that case only the implementation is abstracted away, but the input type is known. If we change the go type for inputs/outputs to proper structures, the actual implementation of the input capability should fail to compile.

}

var (
// ErrBlocked is returned when capability is blocking.
ErrBlocked = errors.New("capability blocked")
)

type capabilitiesManager struct {
caps []Capability
reporter status.Reporter
}

type capabilityFactory func(*logger.Logger, *ruleDefinitions, status.Reporter) (Capability, error)

// Load loads capabilities files and prepares manager.
func Load(capsFile string, log *logger.Logger, sc status.Controller) (Capability, error) {
handlers := []capabilityFactory{
newInputsCapability,
newOutputsCapability,
newUpgradesCapability,
}

cm := &capabilitiesManager{
caps: make([]Capability, 0),
reporter: sc.RegisterWithPersistance("capabilities", true),
}

// load capabilities from file
fd, err := os.Open(capsFile)
if err != nil && !os.IsNotExist(err) {
return cm, err
}

if os.IsNotExist(err) {
log.Infof("capabilities file not found in %s", capsFile)
return cm, nil
}
defer fd.Close()

definitions := &ruleDefinitions{Capabilities: make([]ruler, 0)}
dec := yaml.NewDecoder(fd)
if err := dec.Decode(&definitions); err != nil {
return cm, err
}

// make list of handlers out of capabilities definition
for _, h := range handlers {
cap, err := h(log, definitions, cm.reporter)
if err != nil {
return nil, err
}

if cap == nil {
continue
}

cm.caps = append(cm.caps, cap)
}

return cm, nil
}

func (mgr *capabilitiesManager) Apply(in interface{}) (interface{}, error) {
var err error
// reset health on start, child caps will update to fail if needed
mgr.reporter.Update(status.Healthy)
for _, cap := range mgr.caps {
in, err = cap.Apply(in)
if err != nil {
return in, err
}
}

return in, nil
}
Loading