From 282a7bcf34e8ef8b68de7dfb44e15c640a96aa2e Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Mon, 15 Feb 2021 14:43:29 +0100 Subject: [PATCH] [Ingest Management] Agent supports capabilities definition (#23848) [Ingest Management] Agent supports capabilities definition (#23848) --- x-pack/elastic-agent/CHANGELOG.asciidoc | 1 + .../pkg/agent/application/emitter.go | 19 +- .../pkg/agent/application/info/agent_id.go | 6 + .../agent/application/inspect_config_cmd.go | 22 +- .../agent/application/inspect_output_cmd.go | 8 + .../pkg/agent/application/local_mode.go | 10 +- .../pkg/agent/application/managed_mode.go | 10 +- .../agent/application/managed_mode_test.go | 2 +- .../application/noop_status_controller.go | 9 +- .../pkg/agent/application/upgrade/upgrade.go | 11 +- .../pkg/capabilities/capabilities.go | 97 +++++ .../pkg/capabilities/capabilities_test.go | 346 +++++++++++++++ x-pack/elastic-agent/pkg/capabilities/expr.go | 37 ++ .../pkg/capabilities/expr_test.go | 43 ++ .../elastic-agent/pkg/capabilities/input.go | 265 ++++++++++++ .../pkg/capabilities/input_test.go | 398 ++++++++++++++++++ .../elastic-agent/pkg/capabilities/output.go | 211 ++++++++++ .../pkg/capabilities/output_test.go | 373 ++++++++++++++++ x-pack/elastic-agent/pkg/capabilities/rule.go | 112 +++++ .../pkg/capabilities/rule_test.go | 122 ++++++ .../testdata/allow_metrics-capabilities.yml | 6 + .../testdata/allow_metrics-config.yml | 20 + .../testdata/allow_metrics-result.yml | 14 + .../testdata/deny_logs-capabilities.yml | 4 + .../testdata/deny_logs-config.yml | 20 + .../testdata/deny_logs-result.yml | 14 + .../testdata/filter_metrics-capabilities.yml | 4 + .../testdata/filter_metrics-config.yml | 20 + .../testdata/filter_metrics-result.yml | 20 + .../testdata/invalid-capabilities.yml | 6 + .../capabilities/testdata/invalid-config.yml | 18 + .../testdata/invalid_output-capabilities.yml | 4 + .../testdata/invalid_output-config.yml | 19 + .../capabilities/testdata/no_caps-config.yml | 20 + .../capabilities/testdata/no_caps-result.yml | 20 + .../elastic-agent/pkg/capabilities/upgrade.go | 191 +++++++++ .../pkg/capabilities/upgrade_test.go | 344 +++++++++++++++ .../elastic-agent/pkg/core/status/reporter.go | 11 +- 38 files changed, 2847 insertions(+), 10 deletions(-) create mode 100644 x-pack/elastic-agent/pkg/capabilities/capabilities.go create mode 100644 x-pack/elastic-agent/pkg/capabilities/capabilities_test.go create mode 100644 x-pack/elastic-agent/pkg/capabilities/expr.go create mode 100644 x-pack/elastic-agent/pkg/capabilities/expr_test.go create mode 100644 x-pack/elastic-agent/pkg/capabilities/input.go create mode 100644 x-pack/elastic-agent/pkg/capabilities/input_test.go create mode 100644 x-pack/elastic-agent/pkg/capabilities/output.go create mode 100644 x-pack/elastic-agent/pkg/capabilities/output_test.go create mode 100644 x-pack/elastic-agent/pkg/capabilities/rule.go create mode 100644 x-pack/elastic-agent/pkg/capabilities/rule_test.go create mode 100644 x-pack/elastic-agent/pkg/capabilities/testdata/allow_metrics-capabilities.yml create mode 100644 x-pack/elastic-agent/pkg/capabilities/testdata/allow_metrics-config.yml create mode 100644 x-pack/elastic-agent/pkg/capabilities/testdata/allow_metrics-result.yml create mode 100644 x-pack/elastic-agent/pkg/capabilities/testdata/deny_logs-capabilities.yml create mode 100644 x-pack/elastic-agent/pkg/capabilities/testdata/deny_logs-config.yml create mode 100644 x-pack/elastic-agent/pkg/capabilities/testdata/deny_logs-result.yml create mode 100644 x-pack/elastic-agent/pkg/capabilities/testdata/filter_metrics-capabilities.yml create mode 100644 x-pack/elastic-agent/pkg/capabilities/testdata/filter_metrics-config.yml create mode 100644 x-pack/elastic-agent/pkg/capabilities/testdata/filter_metrics-result.yml create mode 100644 x-pack/elastic-agent/pkg/capabilities/testdata/invalid-capabilities.yml create mode 100644 x-pack/elastic-agent/pkg/capabilities/testdata/invalid-config.yml create mode 100644 x-pack/elastic-agent/pkg/capabilities/testdata/invalid_output-capabilities.yml create mode 100644 x-pack/elastic-agent/pkg/capabilities/testdata/invalid_output-config.yml create mode 100644 x-pack/elastic-agent/pkg/capabilities/testdata/no_caps-config.yml create mode 100644 x-pack/elastic-agent/pkg/capabilities/testdata/no_caps-result.yml create mode 100644 x-pack/elastic-agent/pkg/capabilities/upgrade.go create mode 100644 x-pack/elastic-agent/pkg/capabilities/upgrade_test.go diff --git a/x-pack/elastic-agent/CHANGELOG.asciidoc b/x-pack/elastic-agent/CHANGELOG.asciidoc index d94ad8aee0d..58f827aca17 100644 --- a/x-pack/elastic-agent/CHANGELOG.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.asciidoc @@ -53,6 +53,7 @@ - Add --staging option to enroll command {pull}20026[20026] - Add `event.dataset` to all events {pull}20076[20076] - Send datastreams fields {pull}20416[20416] +- Agent supports capabilities definition {pull}23848[23848] [[release-notes-7.8.0]] === Elastic Agent version 7.8.0 diff --git a/x-pack/elastic-agent/pkg/agent/application/emitter.go b/x-pack/elastic-agent/pkg/agent/application/emitter.go index 5faa6814e77..0b575db52f7 100644 --- a/x-pack/elastic-agent/pkg/agent/application/emitter.go +++ b/x-pack/elastic-agent/pkg/agent/application/emitter.go @@ -14,6 +14,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" @@ -42,6 +43,7 @@ type emitterController struct { router programsDispatcher modifiers *configModifiers reloadables []reloadable + caps capabilities.Capability // state lock sync.RWMutex @@ -65,6 +67,20 @@ func (e *emitterController) Update(c *config.Config) error { if err != nil { return errors.New(err, "could not create the AST from the configuration", errors.TypeConfig) } + + if e.caps != nil { + var ok bool + updatedAst, err := e.caps.Apply(rawAst) + if err != nil { + return errors.New(err, "failed to apply capabilities") + } + + rawAst, ok = updatedAst.(*transpiler.AST) + if !ok { + return errors.New("failed to transform object returned from capabilities to AST", errors.TypeConfig) + } + } + for _, filter := range e.modifiers.Filters { if err := filter(e.logger, rawAst); err != nil { return errors.New(err, "failed to filter configuration", errors.TypeConfig) @@ -142,7 +158,7 @@ func (e *emitterController) update() error { return e.router.Dispatch(ast.HashStr(), programsToRun) } -func emitter(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, controller composable.Controller, router programsDispatcher, modifiers *configModifiers, reloadables ...reloadable) (emitterFunc, error) { +func emitter(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, controller composable.Controller, router programsDispatcher, modifiers *configModifiers, caps capabilities.Capability, reloadables ...reloadable) (emitterFunc, error) { log.Debugf("Supported programs: %s", strings.Join(program.KnownProgramNames(), ", ")) init, _ := transpiler.NewVars(map[string]interface{}{}) @@ -154,6 +170,7 @@ func emitter(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, modifiers: modifiers, reloadables: reloadables, vars: []*transpiler.Vars{init}, + caps: caps, } err := controller.Run(ctx, func(vars []*transpiler.Vars) { ctrl.Set(vars) diff --git a/x-pack/elastic-agent/pkg/agent/application/info/agent_id.go b/x-pack/elastic-agent/pkg/agent/application/info/agent_id.go index a6551b68580..f189c43d01e 100644 --- a/x-pack/elastic-agent/pkg/agent/application/info/agent_id.go +++ b/x-pack/elastic-agent/pkg/agent/application/info/agent_id.go @@ -20,6 +20,7 @@ import ( ) // defaultAgentConfigFile is a name of file used to store agent information +const defaultAgentCapabilitiesFile = "capabilities.yml" const defaultAgentConfigFile = "fleet.yml" const agentInfoKey = "agent" @@ -44,6 +45,11 @@ func AgentConfigFile() string { return filepath.Join(paths.Config(), defaultAgentConfigFile) } +// AgentCapabilitiesPath is a name of file used to store agent capabilities +func AgentCapabilitiesPath() string { + return filepath.Join(paths.Config(), defaultAgentCapabilitiesFile) +} + // AgentActionStoreFile is the file that contains the action that can be replayed after restart. func AgentActionStoreFile() string { return filepath.Join(paths.Home(), defaultAgentActionStoreFile) diff --git a/x-pack/elastic-agent/pkg/agent/application/inspect_config_cmd.go b/x-pack/elastic-agent/pkg/agent/application/inspect_config_cmd.go index 15dccd83a88..5655aad4d3e 100644 --- a/x-pack/elastic-agent/pkg/agent/application/inspect_config_cmd.go +++ b/x-pack/elastic-agent/pkg/agent/application/inspect_config_cmd.go @@ -13,7 +13,9 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" ) @@ -118,7 +120,25 @@ func loadFleetConfig(cfg *config.Config) (map[string]interface{}, error) { } func printMapStringConfig(mapStr map[string]interface{}) error { - data, err := yaml.Marshal(mapStr) + l, err := newErrorLogger() + if err != nil { + return err + } + caps, err := capabilities.Load(info.AgentCapabilitiesPath(), l, status.NewController(l)) + if err != nil { + return err + } + + newCfg, err := caps.Apply(mapStr) + if err != nil { + return errors.New(err, "failed to apply capabilities") + } + newMap, ok := newCfg.(map[string]interface{}) + if !ok { + return errors.New("config returned from capabilities has invalid type") + } + + data, err := yaml.Marshal(newMap) if err != nil { return errors.New(err, "could not marshal to YAML") } diff --git a/x-pack/elastic-agent/pkg/agent/application/inspect_output_cmd.go b/x-pack/elastic-agent/pkg/agent/application/inspect_output_cmd.go index 49ec8d0b3eb..2ec9280dfa0 100644 --- a/x-pack/elastic-agent/pkg/agent/application/inspect_output_cmd.go +++ b/x-pack/elastic-agent/pkg/agent/application/inspect_output_cmd.go @@ -17,10 +17,12 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/noop" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" ) // InspectOutputCmd is an inspect subcommand that shows configurations of the agent. @@ -207,6 +209,11 @@ func getProgramsFromConfig(log *logger.Logger, agentInfo *info.AgentInfo, cfg *c modifiers.Filters = append(modifiers.Filters, injectFleet(cfg, sysInfo.Info(), agentInfo)) } + caps, err := capabilities.Load(info.AgentCapabilitiesPath(), log, status.NewController(log)) + if err != nil { + return nil, err + } + emit, err := emitter( ctx, log, @@ -214,6 +221,7 @@ func getProgramsFromConfig(log *logger.Logger, agentInfo *info.AgentInfo, cfg *c composableWaiter, router, modifiers, + caps, monitor, ) if err != nil { diff --git a/x-pack/elastic-agent/pkg/agent/application/local_mode.go b/x-pack/elastic-agent/pkg/agent/application/local_mode.go index 5123cd5f834..e805c7423aa 100644 --- a/x-pack/elastic-agent/pkg/agent/application/local_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/local_mode.go @@ -14,6 +14,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" @@ -66,6 +67,11 @@ func newLocal( agentInfo *info.AgentInfo, ) (*Local, error) { statusController := &noopController{} + caps, err := capabilities.Load(info.AgentCapabilitiesPath(), log, statusController) + if err != nil { + return nil, err + } + cfg, err := configuration.NewFromConfig(rawConfig) if err != nil { return nil, err @@ -120,6 +126,7 @@ func newLocal( Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.StreamChecker}, }, + caps, monitor, ) if err != nil { @@ -145,7 +152,8 @@ func newLocal( []context.CancelFunc{localApplication.cancelCtxFn}, reexec, newNoopAcker(), - reporter) + reporter, + caps) uc.SetUpgrader(upgrader) return localApplication, nil diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go index 9ad1f24a3d0..c5c6d542d8f 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go @@ -20,6 +20,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" @@ -66,6 +67,11 @@ func newManaged( agentInfo *info.AgentInfo, ) (*Managed, error) { statusController := status.NewController(log) + caps, err := capabilities.Load(info.AgentCapabilitiesPath(), log, statusController) + if err != nil { + return nil, err + } + path := info.AgentConfigFile() store := storage.NewDiskStore(path) @@ -173,6 +179,7 @@ func newManaged( Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.StreamChecker, injectFleet(config, sysInfo.Info(), agentInfo)}, }, + caps, monitor, ) if err != nil { @@ -205,7 +212,8 @@ func newManaged( []context.CancelFunc{managedApplication.cancelCtxFn}, reexec, acker, - combinedReporter) + combinedReporter, + caps) policyChanger := &handlerPolicyChange{ log: log, diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go index 4325c5ce7a6..890183e9033 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go @@ -39,7 +39,7 @@ func TestManagedModeRouting(t *testing.T) { agentInfo, _ := info.NewAgentInfo() nullStore := &storage.NullStore{} composableCtrl, _ := composable.New(log, nil) - emit, err := emitter(ctx, log, agentInfo, composableCtrl, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}}) + emit, err := emitter(ctx, log, agentInfo, composableCtrl, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}}, nil) require.NoError(t, err) actionDispatcher, err := newActionDispatcher(ctx, log, &handlerDefault{log: log}) diff --git a/x-pack/elastic-agent/pkg/agent/application/noop_status_controller.go b/x-pack/elastic-agent/pkg/agent/application/noop_status_controller.go index 61528e0b761..1c55fda08e7 100644 --- a/x-pack/elastic-agent/pkg/agent/application/noop_status_controller.go +++ b/x-pack/elastic-agent/pkg/agent/application/noop_status_controller.go @@ -11,9 +11,12 @@ import ( type noopController struct{} func (*noopController) Register(_ string) status.Reporter { return &noopReporter{} } -func (*noopController) Status() status.AgentStatus { return status.Healthy } -func (*noopController) UpdateStateID(_ string) {} -func (*noopController) StatusString() string { return "online" } +func (*noopController) RegisterWithPersistance(_ string, _ bool) status.Reporter { + return &noopReporter{} +} +func (*noopController) Status() status.AgentStatus { return status.Healthy } +func (*noopController) UpdateStateID(_ string) {} +func (*noopController) StatusString() string { return "online" } type noopReporter struct{} diff --git a/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go b/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go index 9b7a271a9d7..4e75d92fb1d 100644 --- a/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go +++ b/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go @@ -18,6 +18,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/install" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" @@ -48,6 +49,7 @@ type Upgrader struct { acker acker reporter stateReporter upgradeable bool + caps capabilities.Capability } // Action is the upgrade action state. @@ -81,7 +83,7 @@ func IsUpgradeable() bool { } // NewUpgrader creates an upgrader which is capable of performing upgrade operation -func NewUpgrader(agentInfo *info.AgentInfo, settings *artifact.Config, log *logger.Logger, closers []context.CancelFunc, reexec reexecManager, a acker, r stateReporter) *Upgrader { +func NewUpgrader(agentInfo *info.AgentInfo, settings *artifact.Config, log *logger.Logger, closers []context.CancelFunc, reexec reexecManager, a acker, r stateReporter, caps capabilities.Capability) *Upgrader { return &Upgrader{ agentInfo: agentInfo, settings: settings, @@ -91,6 +93,7 @@ func NewUpgrader(agentInfo *info.AgentInfo, settings *artifact.Config, log *logg acker: a, reporter: r, upgradeable: IsUpgradeable(), + caps: caps, } } @@ -116,6 +119,12 @@ func (u *Upgrader) Upgrade(ctx context.Context, a Action, reexecNow bool) (err e "running under control of the systems supervisor") } + if u.caps != nil { + if _, err := u.caps.Apply(a); err == capabilities.ErrBlocked { + return nil + } + } + u.reportUpdating(a.Version()) sourceURI, err := u.sourceURI(a.Version(), a.SourceURI()) diff --git a/x-pack/elastic-agent/pkg/capabilities/capabilities.go b/x-pack/elastic-agent/pkg/capabilities/capabilities.go new file mode 100644 index 00000000000..498e09ac101 --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/capabilities.go @@ -0,0 +1,97 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package capabilities + +import ( + "errors" + "os" + + "gopkg.in/yaml.v2" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" +) + +// Capability provides a way of applying predefined filter to object. +// It's up to capability to determine if capability is applicable on object. +type Capability interface { + // Apply applies capabilities on input and returns true if input should be completely blocked + // otherwise, false and updated input is returned + Apply(interface{}) (interface{}, error) +} + +var ( + // ErrBlocked is returned when capability is blocking. + ErrBlocked = errors.New("capability blocked") +) + +type capabilitiesManager struct { + caps []Capability + reporter status.Reporter +} + +type capabilityFactory func(*logger.Logger, *ruleDefinitions, status.Reporter) (Capability, error) + +// Load loads capabilities files and prepares manager. +func Load(capsFile string, log *logger.Logger, sc status.Controller) (Capability, error) { + handlers := []capabilityFactory{ + newInputsCapability, + newOutputsCapability, + newUpgradesCapability, + } + + cm := &capabilitiesManager{ + caps: make([]Capability, 0), + reporter: sc.RegisterWithPersistance("capabilities", true), + } + + // load capabilities from file + fd, err := os.Open(capsFile) + if err != nil && !os.IsNotExist(err) { + return cm, err + } + + if os.IsNotExist(err) { + log.Infof("capabilities file not found in %s", capsFile) + return cm, nil + } + defer fd.Close() + + definitions := &ruleDefinitions{Capabilities: make([]ruler, 0)} + dec := yaml.NewDecoder(fd) + if err := dec.Decode(&definitions); err != nil { + return cm, err + } + + // make list of handlers out of capabilities definition + for _, h := range handlers { + cap, err := h(log, definitions, cm.reporter) + if err != nil { + return nil, err + } + + if cap == nil { + continue + } + + cm.caps = append(cm.caps, cap) + } + + return cm, nil +} + +func (mgr *capabilitiesManager) Apply(in interface{}) (interface{}, error) { + var err error + // reset health on start, child caps will update to fail if needed + mgr.reporter.Update(status.Healthy) + for _, cap := range mgr.caps { + in, err = cap.Apply(in) + if err != nil { + return in, err + } + } + + return in, nil +} diff --git a/x-pack/elastic-agent/pkg/capabilities/capabilities_test.go b/x-pack/elastic-agent/pkg/capabilities/capabilities_test.go new file mode 100644 index 00000000000..d59b7c62939 --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/capabilities_test.go @@ -0,0 +1,346 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package capabilities + +import ( + "fmt" + "io" + "os" + "path/filepath" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" +) + +func TestLoadCapabilities(t *testing.T) { + testCases := []string{ + "filter_metrics", + "allow_metrics", + "deny_logs", + "no_caps", + } + + l, _ := logger.New("test") + + for _, tc := range testCases { + t.Run(tc, func(t *testing.T) { + filename := filepath.Join("testdata", fmt.Sprintf("%s-capabilities.yml", tc)) + controller := status.NewController(l) + caps, err := Load(filename, l, controller) + assert.NoError(t, err) + assert.NotNil(t, caps) + + cfg, configCloser := getConfigWithCloser(t, filepath.Join("testdata", fmt.Sprintf("%s-config.yml", tc))) + defer configCloser.Close() + + mm, err := cfg.ToMapStr() + assert.NoError(t, err) + assert.NotNil(t, mm) + + out, err := caps.Apply(mm) + assert.NoError(t, err, "should not be failing") + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + + resultConfig, ok := out.(map[string]interface{}) + assert.True(t, ok) + + expectedConfig, resultCloser := getConfigWithCloser(t, filepath.Join("testdata", fmt.Sprintf("%s-result.yml", tc))) + defer resultCloser.Close() + + expectedMap, err := expectedConfig.ToMapStr() + fixInputsType(expectedMap) + fixInputsType(resultConfig) + + if !assert.True(t, cmp.Equal(expectedMap, resultConfig)) { + diff := cmp.Diff(expectedMap, resultConfig) + if diff != "" { + t.Errorf("%s mismatch (-want +got):\n%s", tc, diff) + } + } + }) + } +} + +func TestInvalidLoadCapabilities(t *testing.T) { + testCases := []string{ + "invalid", + "invalid_output", + } + + l, _ := logger.New("test") + + for _, tc := range testCases { + t.Run(tc, func(t *testing.T) { + filename := filepath.Join("testdata", fmt.Sprintf("%s-capabilities.yml", tc)) + controller := status.NewController(l) + caps, err := Load(filename, l, controller) + assert.NoError(t, err) + assert.NotNil(t, caps) + + cfg, configCloser := getConfigWithCloser(t, filepath.Join("testdata", fmt.Sprintf("%s-config.yml", tc))) + defer configCloser.Close() + + mm, err := cfg.ToMapStr() + assert.NoError(t, err) + assert.NotNil(t, mm) + + _, err = caps.Apply(mm) + assert.Error(t, err, "should be failing") + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + }) + } +} + +func getConfigWithCloser(t *testing.T, cfgFile string) (*config.Config, io.Closer) { + configFile, err := os.Open(cfgFile) + require.NoError(t, err) + + cfg, err := config.NewConfigFrom(configFile) + require.NoError(t, err) + require.NotNil(t, cfg) + + return cfg, configFile +} + +func fixInputsType(mm map[string]interface{}) { + if i, found := mm[inputsKey]; found { + var inputs []interface{} + + if im, ok := i.([]map[string]interface{}); ok { + for _, val := range im { + inputs = append(inputs, val) + } + } else if im, ok := i.([]interface{}); ok { + inputs = im + } + mm[inputsKey] = inputs + } +} + +func TestCapabilityManager(t *testing.T) { + l := newErrorLogger(t) + + t.Run("filter", func(t *testing.T) { + m := getConfig() + mgr := &capabilitiesManager{ + caps: []Capability{ + filterKeywordCap{keyWord: "filter"}, + }, + reporter: status.NewController(l).Register("test"), + } + + newIn, err := mgr.Apply(m) + assert.NoError(t, err, "should not be failing") + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + + newMap, ok := newIn.(map[string]string) + assert.True(t, ok, "new input is not a map") + + _, found := newMap["filter"] + assert.False(t, found, "filter does not filter keyword") + + val, found := newMap["key"] + assert.True(t, found, "filter filters additional keys") + assert.Equal(t, "val", val, "filter modifies additional keys") + }) + + t.Run("filter before block", func(t *testing.T) { + m := getConfig() + mgr := &capabilitiesManager{ + caps: []Capability{ + filterKeywordCap{keyWord: "filter"}, + blockCap{}, + }, + reporter: status.NewController(l).Register("test"), + } + + newIn, err := mgr.Apply(m) + assert.Error(t, err, "should be failing") + assert.Equal(t, ErrBlocked, err, "should be blocking") + + newMap, ok := newIn.(map[string]string) + assert.True(t, ok, "new input is not a map") + + _, found := newMap["filter"] + assert.False(t, found, "filter does not filter keyword") + + val, found := newMap["key"] + assert.True(t, found, "filter filters additional keys") + assert.Equal(t, "val", val, "filter modifies additional keys") + }) + + t.Run("filter after block", func(t *testing.T) { + m := getConfig() + mgr := &capabilitiesManager{ + caps: []Capability{ + filterKeywordCap{keyWord: "filter"}, + blockCap{}, + }, + reporter: status.NewController(l).Register("test"), + } + + newIn, err := mgr.Apply(m) + assert.Error(t, err, "should be failing") + assert.Equal(t, ErrBlocked, err, "should be blocking") + + newMap, ok := newIn.(map[string]string) + assert.True(t, ok, "new input is not a map") + + _, found := newMap["filter"] + assert.False(t, found, "filter does not filter keyword") + + val, found := newMap["key"] + assert.True(t, found, "filter filters additional keys") + assert.Equal(t, "val", val, "filter modifies additional keys") + }) + + t.Run("filter before keep", func(t *testing.T) { + m := getConfig() + mgr := &capabilitiesManager{ + caps: []Capability{ + filterKeywordCap{keyWord: "filter"}, + keepAsIsCap{}, + }, + reporter: status.NewController(l).Register("test"), + } + + newIn, err := mgr.Apply(m) + assert.NoError(t, err, "should not be failing") + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + + newMap, ok := newIn.(map[string]string) + assert.True(t, ok, "new input is not a map") + + _, found := newMap["filter"] + assert.False(t, found, "filter does not filter keyword") + + val, found := newMap["key"] + assert.True(t, found, "filter filters additional keys") + assert.Equal(t, "val", val, "filter modifies additional keys") + }) + + t.Run("filter after keep", func(t *testing.T) { + m := getConfig() + mgr := &capabilitiesManager{ + caps: []Capability{ + filterKeywordCap{keyWord: "filter"}, + keepAsIsCap{}, + }, + reporter: status.NewController(l).Register("test"), + } + + newIn, err := mgr.Apply(m) + assert.NoError(t, err, "should not be failing") + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + + newMap, ok := newIn.(map[string]string) + assert.True(t, ok, "new input is not a map") + + _, found := newMap["filter"] + assert.False(t, found, "filter does not filter keyword") + + val, found := newMap["key"] + assert.True(t, found, "filter filters additional keys") + assert.Equal(t, "val", val, "filter modifies additional keys") + }) + + t.Run("filter before filter", func(t *testing.T) { + m := getConfig() + mgr := &capabilitiesManager{ + caps: []Capability{ + filterKeywordCap{keyWord: "filter"}, + filterKeywordCap{keyWord: "key"}, + }, + reporter: status.NewController(l).Register("test"), + } + + newIn, err := mgr.Apply(m) + assert.NoError(t, err, "should not be failing") + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + + newMap, ok := newIn.(map[string]string) + assert.True(t, ok, "new input is not a map") + + _, found := newMap["filter"] + assert.False(t, found, "filter does not filter keyword") + + _, found = newMap["key"] + assert.False(t, found, "filter filters additional keys") + }) + t.Run("filter after filter", func(t *testing.T) { + m := getConfig() + mgr := &capabilitiesManager{ + caps: []Capability{ + filterKeywordCap{keyWord: "key"}, + filterKeywordCap{keyWord: "filter"}, + }, + reporter: status.NewController(l).Register("test"), + } + + newIn, err := mgr.Apply(m) + assert.NoError(t, err, "should not be failing") + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + + newMap, ok := newIn.(map[string]string) + assert.True(t, ok, "new input is not a map") + + _, found := newMap["filter"] + assert.False(t, found, "filter does not filter keyword") + + _, found = newMap["key"] + assert.False(t, found, "filter filters additional keys") + }) +} + +type keepAsIsCap struct{} + +func (keepAsIsCap) Apply(in interface{}) (interface{}, error) { + return in, nil +} + +type blockCap struct{} + +func (blockCap) Apply(in interface{}) (interface{}, error) { + return in, ErrBlocked +} + +type filterKeywordCap struct { + keyWord string +} + +func (f filterKeywordCap) Apply(in interface{}) (interface{}, error) { + mm, ok := in.(map[string]string) + if !ok { + return in, nil + } + + delete(mm, f.keyWord) + return mm, nil +} + +func getConfig() map[string]string { + return map[string]string{ + "filter": "f_val", + "key": "val", + } +} + +func newErrorLogger(t *testing.T) *logger.Logger { + t.Helper() + + loggerCfg := logger.DefaultLoggingConfig() + loggerCfg.Level = logp.ErrorLevel + + log, err := logger.NewFromConfig("", loggerCfg) + require.NoError(t, err) + return log +} diff --git a/x-pack/elastic-agent/pkg/capabilities/expr.go b/x-pack/elastic-agent/pkg/capabilities/expr.go new file mode 100644 index 00000000000..8d22f61d519 --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/expr.go @@ -0,0 +1,37 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package capabilities + +import "strings" + +const ( + wild = "*" + separator = "/" +) + +func matchesExpr(pattern, target string) bool { + if pattern == wild { + return true + } + + patternParts := strings.Split(pattern, separator) + targetParts := strings.Split(target, separator) + + if len(patternParts) != len(targetParts) { + return false + } + + for i, pp := range patternParts { + if pp == wild { + continue + } + + if pp != targetParts[i] { + return false + } + } + + return true +} diff --git a/x-pack/elastic-agent/pkg/capabilities/expr_test.go b/x-pack/elastic-agent/pkg/capabilities/expr_test.go new file mode 100644 index 00000000000..2021d68d4a6 --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/expr_test.go @@ -0,0 +1,43 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package capabilities + +import ( + "fmt" + "testing" + + "gotest.tools/assert" +) + +func TestExpr(t *testing.T) { + cases := []struct { + Pattern string + Value string + ShouldMatch bool + }{ + {"", "", true}, + {"*", "", true}, + {"*", "test", true}, + {"*", "system/test", true}, + {"system/*", "system/test", true}, + {"*/test", "system/test", true}, + {"*/*", "system/test", true}, + {"system/*", "agent/test", false}, + {"*/test", "test/system", false}, + {"*/test", "test", false}, + {"*/*", "test", false}, + } + + for i, tc := range cases { + t.Run(fmt.Sprintf("testcase #%d", i), func(tt *testing.T) { + match := matchesExpr(tc.Pattern, tc.Value) + assert.Equal(t, + tc.ShouldMatch, + match, + fmt.Sprintf("'%s' and '%s' and expecting should match: %v", tc.Pattern, tc.Value, tc.ShouldMatch), + ) + }) + } +} diff --git a/x-pack/elastic-agent/pkg/capabilities/input.go b/x-pack/elastic-agent/pkg/capabilities/input.go new file mode 100644 index 00000000000..7ddc1a22496 --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/input.go @@ -0,0 +1,265 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package capabilities + +import ( + "fmt" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" +) + +const ( + inputsKey = "inputs" +) + +func newInputsCapability(log *logger.Logger, rd *ruleDefinitions, reporter status.Reporter) (Capability, error) { + if rd == nil { + return &multiInputsCapability{log: log, caps: []*inputCapability{}}, nil + } + + caps := make([]*inputCapability, 0, len(rd.Capabilities)) + + for _, r := range rd.Capabilities { + c, err := newInputCapability(log, r, reporter) + if err != nil { + return nil, err + } + + if c != nil { + caps = append(caps, c) + } + } + + return &multiInputsCapability{log: log, caps: caps}, nil +} + +func newInputCapability(log *logger.Logger, r ruler, reporter status.Reporter) (*inputCapability, error) { + cap, ok := r.(*inputCapability) + if !ok { + return nil, nil + } + + cap.log = log + cap.reporter = reporter + return cap, nil +} + +type inputCapability struct { + log *logger.Logger + reporter status.Reporter + Name string `json:"name,omitempty" yaml:"name,omitempty"` + Type string `json:"rule" yaml:"rule"` + Input string `json:"input" yaml:"input"` +} + +func (c *inputCapability) Apply(cfgMap map[string]interface{}) (map[string]interface{}, error) { + inputsIface, ok := cfgMap[inputsKey] + if ok { + if inputs := inputsMap(inputsIface, c.log); inputs != nil { + renderedInputs, err := c.renderInputs(inputs) + if err != nil { + c.log.Errorf("marking inputs failed for capability '%s': %v", c.name(), err) + return cfgMap, err + } + + cfgMap[inputsKey] = renderedInputs + return cfgMap, nil + } + + return cfgMap, nil + } + + return cfgMap, nil +} + +func inputsMap(cfgInputs interface{}, l *logger.Logger) []map[string]interface{} { + if inputs, ok := cfgInputs.([]map[string]interface{}); ok { + return inputs + } + + inputsSet, ok := cfgInputs.([]interface{}) + if !ok { + l.Warn("inputs is not an array") + return nil + } + + inputsMap := make([]map[string]interface{}, 0, len(inputsSet)) + for _, s := range inputsSet { + mm, ok := s.(map[string]interface{}) + if !ok { + continue + } + inputsMap = append(inputsMap, mm) + } + + return inputsMap +} + +func (c *inputCapability) Rule() string { + return c.Type +} + +func (c *inputCapability) name() string { + if c.Name != "" { + return c.Name + } + + t := "A" + if c.Type == denyKey { + t = "D" + } + + // e.g IA(*) or ID(system/*) + c.Name = fmt.Sprintf("I%s(%s)", t, c.Input) + return c.Name +} + +func (c *inputCapability) renderInputs(inputs []map[string]interface{}) ([]map[string]interface{}, error) { + newInputs := make([]map[string]interface{}, 0, len(inputs)) + + for i, input := range inputs { + inputTypeIface, found := input[typeKey] + if !found { + return newInputs, errors.New(fmt.Sprintf("input '%d' is missing type key", i), errors.TypeConfig) + } + + inputType, ok := inputTypeIface.(string) + if !ok { + newInputs = append(newInputs, input) + continue + } + + // if input does not match definition continue + if !matchesExpr(c.Input, inputType) { + newInputs = append(newInputs, input) + continue + } + + if _, found := input[conditionKey]; found { + // we already visited + newInputs = append(newInputs, input) + continue + } + + isSupported := c.Type == allowKey + + input[conditionKey] = isSupported + if !isSupported { + c.log.Errorf("input '%s' is left out due to capability restriction '%s'", inputType, c.name()) + c.reporter.Update(status.Degraded) + } + + newInputs = append(newInputs, input) + } + + return newInputs, nil +} + +type multiInputsCapability struct { + caps []*inputCapability + log *logger.Logger +} + +func (c *multiInputsCapability) Apply(in interface{}) (interface{}, error) { + inputsMap, transform, err := configObject(in) + if err != nil { + c.log.Errorf("constructing config object failed for 'multi-inputs' capability '%s': %v", err) + return in, nil + } + if inputsMap == nil { + return in, nil + } + + for _, cap := range c.caps { + // input capability is not blocking + inputsMap, err = cap.Apply(inputsMap) + if err != nil { + return in, err + } + } + + inputsMap, err = c.cleanupInput(inputsMap) + if err != nil { + c.log.Errorf("cleaning up config object failed for capability 'multi-outputs': %v", err) + return in, nil + } + + if transform == nil { + return inputsMap, nil + } + + return transform(inputsMap), nil +} + +func (c *multiInputsCapability) cleanupInput(cfgMap map[string]interface{}) (map[string]interface{}, error) { + inputsIface, found := cfgMap[inputsKey] + if !found { + return cfgMap, nil + } + + inputsList, ok := inputsIface.([]map[string]interface{}) + if !ok { + return nil, fmt.Errorf("inputs must be an array") + } + + newInputs := make([]map[string]interface{}, 0, len(inputsList)) + + for _, inputMap := range inputsList { + acceptValue := true + conditionIface, found := inputMap[conditionKey] + if found { + conditionVal, ok := conditionIface.(bool) + if ok { + acceptValue = conditionVal + } + } + + if !acceptValue { + continue + } + + delete(inputMap, conditionKey) + newInputs = append(newInputs, inputMap) + } + + cfgMap[inputsKey] = newInputs + return cfgMap, nil +} + +func configObject(a interface{}) (map[string]interface{}, func(interface{}) interface{}, error) { + if ast, ok := a.(*transpiler.AST); ok { + fn := func(i interface{}) interface{} { + mm, ok := i.(map[string]interface{}) + if !ok { + return i + } + + ast, err := transpiler.NewAST(mm) + if err != nil { + return i + } + return ast + } + mm, err := ast.Map() + if err != nil { + return nil, nil, err + } + + return mm, fn, nil + } + + if mm, ok := a.(map[string]interface{}); ok { + fn := func(i interface{}) interface{} { + // return as is + return i + } + return mm, fn, nil + } + + return nil, nil, nil +} diff --git a/x-pack/elastic-agent/pkg/capabilities/input_test.go b/x-pack/elastic-agent/pkg/capabilities/input_test.go new file mode 100644 index 00000000000..8416a81649b --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/input_test.go @@ -0,0 +1,398 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package capabilities + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" +) + +func TestMultiInput(t *testing.T) { + tr := &testReporter{} + l, _ := logger.New("test") + t.Run("no match", func(t *testing.T) { + + rd := &ruleDefinitions{ + Capabilities: []ruler{&inputCapability{ + Type: "allow", + Input: "something_else", + }}, + } + + initialInputs := []string{"system/metrics", "system/logs"} + expectedInputs := []string{"system/metrics", "system/logs"} + runMultiInputTest(t, l, rd, expectedInputs, initialInputs) + }) + + t.Run("filters metrics", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{&inputCapability{ + Type: "deny", + Input: "system/metrics", + }}, + } + + initialInputs := []string{"system/metrics", "system/logs"} + expectedInputs := []string{"system/logs"} + runMultiInputTest(t, l, rd, expectedInputs, initialInputs) + }) + + t.Run("allows metrics only", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{ + &inputCapability{ + Type: "allow", + Input: "system/metrics", + }, + &inputCapability{ + Type: "deny", + Input: "*", + }, + }, + } + + initialInputs := []string{"system/metrics", "system/logs", "something_else"} + expectedInputs := []string{"system/metrics"} + runMultiInputTest(t, l, rd, expectedInputs, initialInputs) + }) + + t.Run("allows everything", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{&inputCapability{ + Type: "allow", + Input: "*", + }}, + } + + initialInputs := []string{"system/metrics", "system/logs"} + expectedInputs := []string{"system/metrics", "system/logs"} + runMultiInputTest(t, l, rd, expectedInputs, initialInputs) + }) + + t.Run("deny everything", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{&inputCapability{ + Type: "deny", + Input: "*", + }}, + } + + initialInputs := []string{"system/metrics", "system/logs"} + expectedInputs := []string{} + runMultiInputTest(t, l, rd, expectedInputs, initialInputs) + }) + + t.Run("deny everything with noise", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{ + &inputCapability{ + Type: "deny", + Input: "*", + }, + &inputCapability{ + Type: "allow", + Input: "something_else", + }, + }, + } + + initialInputs := []string{"system/metrics", "system/logs"} + expectedInputs := []string{} + runMultiInputTest(t, l, rd, expectedInputs, initialInputs) + }) + + t.Run("keep format", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{&inputCapability{ + Type: "deny", + Input: "system/metrics", + }}, + } + + initialInputs := []string{"system/metrics", "system/logs"} + expectedInputs := []string{"system/logs"} + + cap, err := newInputsCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + inputs := getInputs(initialInputs...) + assert.NotNil(t, inputs) + + res, err := cap.Apply(inputs) + assert.NoError(t, err, "should not be failing") + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + + ast, ok := res.(*transpiler.AST) + assert.True(t, ok, "expecting an ast") + + inputsIface, found := transpiler.Lookup(ast, "inputs") + assert.True(t, found, "input not found") + + inputsList, ok := inputsIface.Value().(*transpiler.List) + assert.True(t, ok, "expecting a list for inputs") + + for _, in := range expectedInputs { + var typeFound bool + nodes := inputsList.Value().([]transpiler.Node) + for _, inputNode := range nodes { + typeNode, found := inputNode.Find("type") + assert.True(t, found, "type not found") + + typeNodeStr, ok := typeNode.Value().(*transpiler.StrVal) + assert.True(t, ok, "type node not strval") + inputType, ok := typeNodeStr.Value().(string) + assert.True(t, ok, "input type key not string") + if inputType == in { + typeFound = true + break + } + } + + assert.True(t, typeFound, fmt.Sprintf("input '%s' type key not found", in)) + } + }) + + t.Run("unknown action", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{&inputCapability{ + Type: "deny", + Input: "system/metrics", + }}, + } + cap, err := newInputsCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + apiAction := fleetapi.ActionUpgrade{} + outAfter, err := cap.Apply(apiAction) + + assert.NoError(t, err, "should not be failing") + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + assert.Equal(t, apiAction, outAfter, "action should not be altered") + }) +} + +func TestInput(t *testing.T) { + l, _ := logger.New("test") + tr := &testReporter{} + t.Run("invalid rule", func(t *testing.T) { + r := &upgradeCapability{} + cap, err := newInputCapability(l, r, tr) + assert.NoError(t, err, "no error expected") + assert.Nil(t, cap, "cap should not be created") + }) + + t.Run("empty eql", func(t *testing.T) { + r := &inputCapability{ + Type: "allow", + Input: "", + } + cap, err := newInputCapability(l, r, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + }) + + t.Run("valid action - 1/1 match", func(t *testing.T) { + r := &inputCapability{ + Type: "allow", + Input: "system/metrics", + } + + initialInputs := []string{"system/metrics"} + expectedInputs := []string{"system/metrics"} + runInputTest(t, l, r, expectedInputs, initialInputs) + }) + + t.Run("valid action - 0/1 match", func(t *testing.T) { + r := &inputCapability{ + Type: "allow", + Input: "system/metrics", + } + + initialInputs := []string{"system/logs"} + expectedInputs := []string{"system/logs"} + runInputTest(t, l, r, expectedInputs, initialInputs) + }) + + t.Run("valid action - deny metrics", func(t *testing.T) { + r := &inputCapability{ + Type: "deny", + Input: "system/metrics", + } + + initialInputs := []string{"system/metrics", "system/logs"} + expectedInputs := []string{"system/logs"} + runInputTest(t, l, r, expectedInputs, initialInputs) + }) + + t.Run("valid action - multiple inputs 1 explicitely allowed", func(t *testing.T) { + r := &inputCapability{ + Type: "allow", + Input: "system/metrics", + } + + initialInputs := []string{"system/metrics", "system/logs"} + expectedInputs := []string{"system/metrics", "system/logs"} + runInputTest(t, l, r, expectedInputs, initialInputs) + }) + +} + +func runInputTest(t *testing.T, l *logger.Logger, r *inputCapability, expectedInputs []string, initialInputs []string) { + tr := &testReporter{} + cap, err := newInputCapability(l, r, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + inputs := getInputsMap(initialInputs...) + assert.NotNil(t, inputs) + + newMap, err := cap.Apply(inputs) + assert.NoError(t, err, "should not be failing") + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + + inputsNode, found := newMap["inputs"] + assert.True(t, found, "inputsnot found") + + inputsList, ok := inputsNode.([]map[string]interface{}) + assert.True(t, ok, "inputs not a list") + + typesMap := make(map[string]bool) + for _, node := range inputsList { + typeNode, ok := node["type"] + if !ok { + continue + } + + inputType, ok := typeNode.(string) + if !ok { + continue + } + + conditionNode, ok := node[conditionKey] + if !ok { + // was not allowed nor denied -> allowing + typesMap[inputType] = true + continue + } + + isAllowed, ok := conditionNode.(bool) + if !ok { + assert.Fail(t, fmt.Sprintf("condition should be bool but it's not for input '%s'", inputType)) + continue + } + + if isAllowed { + typesMap[inputType] = true + } + } + + assert.Equal(t, len(expectedInputs), len(typesMap)) + for _, ei := range expectedInputs { + _, found = typesMap[ei] + assert.True(t, found, fmt.Sprintf("'%s' not found", ei)) + delete(typesMap, ei) + } + + for k := range typesMap { + assert.Fail(t, fmt.Sprintf("'%s' found but was not expected", k)) + } +} + +func runMultiInputTest(t *testing.T, l *logger.Logger, rd *ruleDefinitions, expectedInputs []string, initialInputs []string) { + tr := &testReporter{} + cap, err := newInputsCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + inputs := getInputsMap(initialInputs...) + assert.NotNil(t, inputs) + + outAfter, err := cap.Apply(inputs) + assert.NoError(t, err, "should not be failing") + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + + newMap, ok := outAfter.(map[string]interface{}) + assert.True(t, ok, "out ast should be AST") + assert.NotNil(t, newMap) + + inputsNode, found := newMap["inputs"] + assert.True(t, found, "inputsnot found") + + inputsList, ok := inputsNode.([]map[string]interface{}) + assert.True(t, ok, "inputs not a list") + + typesMap := make(map[string]bool) + for _, node := range inputsList { + typeNode, ok := node["type"] + if !ok { + continue + } + + inputType, ok := typeNode.(string) + if !ok { + continue + } + typesMap[inputType] = true + } + + assert.Equal(t, len(expectedInputs), len(typesMap)) + for _, ei := range expectedInputs { + _, found = typesMap[ei] + assert.True(t, found, fmt.Sprintf("'%s' not found", ei)) + delete(typesMap, ei) + } + + for k := range typesMap { + assert.Fail(t, fmt.Sprintf("'%s' found but was not expected", k)) + } +} + +func getInputs(tt ...string) *transpiler.AST { + astMap := getInputsMap(tt...) + ast, _ := transpiler.NewAST(astMap) + return ast +} + +func getInputsMap(tt ...string) map[string]interface{} { + astMap := make(map[string]interface{}) + inputs := make([]map[string]interface{}, 0, len(tt)) + + for _, t := range tt { + mm := map[string]interface{}{ + "type": t, + "use_output": "testing", + "data_stream.namespace": "default", + "streams": []map[string]interface{}{ + { + "metricset": "cpu", + "data_stream.dataset": "system.cpu", + }, + { + "metricset": "memory", + "data_stream.dataset": "system.memory", + }, + }, + } + inputs = append(inputs, mm) + } + + astMap["inputs"] = inputs + + return astMap +} + +type testReporter struct{} + +func (*testReporter) Update(status.AgentStatus) {} +func (*testReporter) Unregister() {} diff --git a/x-pack/elastic-agent/pkg/capabilities/output.go b/x-pack/elastic-agent/pkg/capabilities/output.go new file mode 100644 index 00000000000..34a9ca6e055 --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/output.go @@ -0,0 +1,211 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package capabilities + +import ( + "fmt" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" +) + +const ( + outputKey = "outputs" + typeKey = "type" +) + +func newOutputsCapability(log *logger.Logger, rd *ruleDefinitions, reporter status.Reporter) (Capability, error) { + if rd == nil { + return &multiOutputsCapability{log: log, caps: []*outputCapability{}}, nil + } + + caps := make([]*outputCapability, 0, len(rd.Capabilities)) + + for _, r := range rd.Capabilities { + c, err := newOutputCapability(log, r, reporter) + if err != nil { + return nil, err + } + + if c != nil { + caps = append(caps, c) + } + } + + return &multiOutputsCapability{log: log, caps: caps}, nil +} + +func newOutputCapability(log *logger.Logger, r ruler, reporter status.Reporter) (*outputCapability, error) { + cap, ok := r.(*outputCapability) + if !ok { + return nil, nil + } + + cap.log = log + cap.reporter = reporter + return cap, nil +} + +type outputCapability struct { + log *logger.Logger + reporter status.Reporter + Name string `json:"name,omitempty" yaml:"name,omitempty"` + Type string `json:"rule" yaml:"rule"` + Output string `json:"output" yaml:"output"` +} + +func (c *outputCapability) Apply(cfgMap map[string]interface{}) (map[string]interface{}, error) { + outputIface, ok := cfgMap[outputKey] + if ok { + outputs, ok := outputIface.(map[string]interface{}) + if ok { + renderedOutputs, err := c.renderOutputs(outputs) + if err != nil { + c.log.Errorf("marking outputs failed for capability '%s': %v", c.name(), err) + return cfgMap, err + } + + cfgMap[outputKey] = renderedOutputs + return cfgMap, nil + } + + return cfgMap, nil + } + + return cfgMap, nil +} + +func (c *outputCapability) Rule() string { + return c.Type +} + +func (c *outputCapability) name() string { + if c.Name != "" { + return c.Name + } + + t := "A" + if c.Type == denyKey { + t = "D" + } + + // e.g OA(*) or OD(logstash) + c.Name = fmt.Sprintf("O%s(%s)", t, c.Output) + return c.Name +} + +func (c *outputCapability) renderOutputs(outputs map[string]interface{}) (map[string]interface{}, error) { + for outputName, outputIface := range outputs { + output, ok := outputIface.(map[string]interface{}) + if !ok { + continue + } + + outputTypeIface, ok := output[typeKey] + if !ok { + return nil, errors.New(fmt.Sprintf("output '%s' is missing type key", outputName), errors.TypeConfig) + } + + outputType, ok := outputTypeIface.(string) + if !ok { + continue + } + + // if input does not match definition continue + if !matchesExpr(c.Output, outputType) { + continue + } + + if _, found := output[conditionKey]; found { + // we already visited + continue + } + + isSupported := c.Type == allowKey + output[conditionKey] = isSupported + outputs[outputName] = output + + if !isSupported { + c.log.Errorf("output '%s' is left out due to capability restriction '%s'", outputName, c.name()) + c.reporter.Update(status.Degraded) + } + } + + return outputs, nil +} + +type multiOutputsCapability struct { + caps []*outputCapability + log *logger.Logger +} + +func (c *multiOutputsCapability) Apply(in interface{}) (interface{}, error) { + configMap, transform, err := configObject(in) + if err != nil { + c.log.Errorf("creating configuration object failed for capability 'multi-outputs': %v", err) + return in, nil + } + if configMap == nil { + return in, nil + } + + for _, cap := range c.caps { + // input capability is not blocking + configMap, err = cap.Apply(configMap) + if err != nil { + return in, err + } + } + + configMap, err = c.cleanupOutput(configMap) + if err != nil { + c.log.Errorf("cleaning up config object failed for capability 'multi-outputs': %v", err) + return in, nil + } + + if transform == nil { + return configMap, nil + } + + return transform(configMap), nil +} + +func (c *multiOutputsCapability) cleanupOutput(cfgMap map[string]interface{}) (map[string]interface{}, error) { + outputsIface, found := cfgMap[outputKey] + if !found { + return cfgMap, nil + } + + outputsMap, ok := outputsIface.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("outputs must be a map") + } + + for outputName, outputIface := range outputsMap { + acceptValue := true + + outputMap, ok := outputIface.(map[string]interface{}) + if ok { + conditionIface, found := outputMap[conditionKey] + if found { + conditionVal, ok := conditionIface.(bool) + if ok { + acceptValue = conditionVal + } + } + } + + if !acceptValue { + delete(outputsMap, outputName) + continue + } + + delete(outputMap, conditionKey) + } + + cfgMap[outputKey] = outputsMap + return cfgMap, nil +} diff --git a/x-pack/elastic-agent/pkg/capabilities/output_test.go b/x-pack/elastic-agent/pkg/capabilities/output_test.go new file mode 100644 index 00000000000..fca32effadc --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/output_test.go @@ -0,0 +1,373 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package capabilities + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" +) + +func TestMultiOutput(t *testing.T) { + tr := &testReporter{} + l, _ := logger.New("test") + t.Run("no match", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{&outputCapability{ + Type: "allow", + Output: "something_else", + }}, + } + + initialOutputs := []string{"elasticsearch", "logstash"} + expectedOutputs := []string{"elasticsearch", "logstash"} + runMultiOutputTest(t, l, rd, expectedOutputs, initialOutputs) + }) + + t.Run("filters logstash", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{&outputCapability{ + Type: "deny", + Output: "logstash", + }}, + } + + initialOutputs := []string{"elasticsearch", "logstash"} + expectedOutputs := []string{"elasticsearch"} + runMultiOutputTest(t, l, rd, expectedOutputs, initialOutputs) + }) + + t.Run("allows logstash only", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{ + &outputCapability{ + Type: "allow", + Output: "logstash", + }, + &outputCapability{ + Type: "deny", + Output: "*", + }, + }, + } + + initialOutputs := []string{"elasticsearch", "logstash"} + expectedOutputs := []string{"logstash"} + runMultiOutputTest(t, l, rd, expectedOutputs, initialOutputs) + }) + + t.Run("allows everything", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{&outputCapability{ + Type: "allow", + Output: "*", + }}, + } + + initialOutputs := []string{"elasticsearch", "logstash"} + expectedOutputs := []string{"elasticsearch", "logstash"} + runMultiOutputTest(t, l, rd, expectedOutputs, initialOutputs) + }) + + t.Run("deny everything", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{&outputCapability{ + Type: "deny", + Output: "*", + }}, + } + + initialOutputs := []string{"elasticsearch", "logstash"} + expectedOutputs := []string{} + runMultiOutputTest(t, l, rd, expectedOutputs, initialOutputs) + }) + + t.Run("keep format", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{&outputCapability{ + Type: "deny", + Output: "logstash", + }}, + } + + initialOutputs := []string{"elasticsearch", "logstash"} + expectedOutputs := []string{"elasticsearch"} + + cap, err := newOutputsCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + outputs := getOutputs(initialOutputs...) + assert.NotNil(t, outputs) + + res, err := cap.Apply(outputs) + assert.NoError(t, err, "should not be failing") + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + + ast, ok := res.(*transpiler.AST) + assert.True(t, ok, "expecting an ast") + + outputsIface, found := transpiler.Lookup(ast, outputKey) + assert.True(t, found, "output not found") + + outputsDict, ok := outputsIface.Value().(*transpiler.Dict) + assert.True(t, ok, "expecting a Dict for outputs") + + for _, in := range expectedOutputs { + var typeFound bool + nodes := outputsDict.Value().([]transpiler.Node) + for _, outputKeyNode := range nodes { + outputNode, ok := outputKeyNode.(*transpiler.Key).Value().(*transpiler.Dict) + assert.True(t, ok, "output type key not string") + + typeNode, found := outputNode.Find("type") + assert.True(t, found, "type not found") + + typeNodeStr, ok := typeNode.Value().(*transpiler.StrVal) + assert.True(t, ok, "type node not strval") + outputType, ok := typeNodeStr.Value().(string) + assert.True(t, ok, "output type key not string") + if outputType == in { + typeFound = true + break + } + } + + assert.True(t, typeFound, fmt.Sprintf("output '%s' type key not found", in)) + } + }) + + t.Run("unknown action", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{&outputCapability{ + Type: "deny", + Output: "logstash", + }}, + } + + cap, err := newOutputsCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + apiAction := fleetapi.ActionUpgrade{} + outAfter, err := cap.Apply(apiAction) + + assert.NoError(t, err, "should not be failing") + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + assert.Equal(t, apiAction, outAfter, "action should not be altered") + }) +} + +func TestOutput(t *testing.T) { + tr := &testReporter{} + l, _ := logger.New("test") + t.Run("invalid rule", func(t *testing.T) { + r := &upgradeCapability{} + cap, err := newOutputCapability(l, r, tr) + assert.NoError(t, err, "no error expected") + assert.Nil(t, cap, "cap should not be created") + }) + + t.Run("empty eql", func(t *testing.T) { + r := &outputCapability{ + Type: "allow", + Output: "", + } + cap, err := newOutputCapability(l, r, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + }) + + t.Run("valid action - 1/1 match", func(t *testing.T) { + r := &outputCapability{ + Type: "allow", + Output: "logstash", + } + + initialOutputs := []string{"logstash"} + expectedOutputs := []string{"logstash"} + runOutputTest(t, l, r, expectedOutputs, initialOutputs) + }) + + t.Run("valid action - 0/1 match", func(t *testing.T) { + r := &outputCapability{ + Type: "allow", + Output: "elasticsearch", + } + + initialOutputs := []string{"logstash"} + expectedOutputs := []string{"logstash"} + runOutputTest(t, l, r, expectedOutputs, initialOutputs) + }) + + t.Run("valid action - deny logstash", func(t *testing.T) { + r := &outputCapability{ + Type: "deny", + Output: "logstash", + } + + initialOutputs := []string{"logstash", "elasticsearch"} + expectedOutputs := []string{"elasticsearch"} + runOutputTest(t, l, r, expectedOutputs, initialOutputs) + }) + + t.Run("valid action - multiple outputs 1 explicitely allowed", func(t *testing.T) { + r := &outputCapability{ + Type: "allow", + Output: "logstash", + } + + initialOutputs := []string{"logstash", "elasticsearch"} + expectedOutputs := []string{"logstash", "elasticsearch"} + runOutputTest(t, l, r, expectedOutputs, initialOutputs) + }) +} + +func runMultiOutputTest(t *testing.T, l *logger.Logger, rd *ruleDefinitions, expectedOutputs []string, initialOutputs []string) { + tr := &testReporter{} + cap, err := newOutputsCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + cfg := getOutputsMap(initialOutputs...) + assert.NotNil(t, cfg) + + outAfter, err := cap.Apply(cfg) + + assert.NoError(t, err, "should not be failing") + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + + newMap, ok := outAfter.(map[string]interface{}) + assert.True(t, ok, "out ast should be a map") + assert.NotNil(t, newMap) + + outputsNode, found := newMap[outputKey] + assert.True(t, found, "outputs not found") + + outputsList, ok := outputsNode.(map[string]interface{}) + assert.True(t, ok, "outputs not a list") + + typesMap := make(map[string]bool) + for _, nodeIface := range outputsList { + node, ok := nodeIface.(map[string]interface{}) + if !ok { + continue + } + + typeNode, ok := node["type"] + if !ok { + continue + } + + outputType, ok := typeNode.(string) + if !ok { + continue + } + typesMap[outputType] = true + } + + assert.Equal(t, len(expectedOutputs), len(typesMap)) + for _, ei := range expectedOutputs { + _, found = typesMap[ei] + assert.True(t, found, fmt.Sprintf("'%s' not found", ei)) + delete(typesMap, ei) + } + + for k := range typesMap { + assert.Fail(t, fmt.Sprintf("'%s' found but was not expected", k)) + } +} + +func runOutputTest(t *testing.T, l *logger.Logger, r *outputCapability, expectedOutputs []string, initialOutputs []string) { + tr := &testReporter{} + cap, err := newOutputCapability(l, r, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + outputs := getOutputsMap(initialOutputs...) + assert.NotNil(t, outputs) + + newMap, err := cap.Apply(outputs) + assert.NoError(t, err, "should not be failing") + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + + outputsNode, found := newMap[outputKey] + assert.True(t, found, "outputs not found") + + outputsList, ok := outputsNode.(map[string]interface{}) + assert.True(t, ok, "outputs not a map") + + typesMap := make(map[string]bool) + for _, nodeIface := range outputsList { + node, ok := nodeIface.(map[string]interface{}) + if !ok { + continue + } + + typeNode, ok := node[typeKey] + if !ok { + continue + } + + outputType, ok := typeNode.(string) + if !ok { + continue + } + + conditionNode, ok := node[conditionKey] + if !ok { + // was not allowed nor denied -> allowing + typesMap[outputType] = true + continue + } + + isAllowed, ok := conditionNode.(bool) + if !ok { + assert.Fail(t, fmt.Sprintf("condition should be bool but it's not for output '%s'", outputType)) + continue + } + + if isAllowed { + typesMap[outputType] = true + } + } + + assert.Equal(t, len(expectedOutputs), len(typesMap)) + for _, ei := range expectedOutputs { + _, found = typesMap[ei] + assert.True(t, found, fmt.Sprintf("'%s' not found", ei)) + delete(typesMap, ei) + } + + for k := range typesMap { + assert.Fail(t, fmt.Sprintf("'%s' found but was not expected", k)) + } +} + +func getOutputs(tt ...string) *transpiler.AST { + astMap := getOutputsMap(tt...) + ast, _ := transpiler.NewAST(astMap) + return ast +} + +func getOutputsMap(tt ...string) map[string]interface{} { + cfgMap := make(map[string]interface{}) + outputs := make(map[string]interface{}) + + for i, t := range tt { + outputs[fmt.Sprintf("id%d", i)] = map[string]interface{}{ + "type": t, + "hosts": []string{"testing"}, + } + } + + cfgMap[outputKey] = outputs + return cfgMap +} diff --git a/x-pack/elastic-agent/pkg/capabilities/rule.go b/x-pack/elastic-agent/pkg/capabilities/rule.go new file mode 100644 index 00000000000..93a28067a64 --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/rule.go @@ -0,0 +1,112 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package capabilities + +import ( + "encoding/json" + "fmt" + + "gopkg.in/yaml.v2" +) + +const ( + allowKey = "allow" + denyKey = "deny" + conditionKey = "__condition__" +) + +type ruler interface { + Rule() string +} + +type capabilitiesList []ruler + +type ruleDefinitions struct { + Version string `yaml:"version" json:"version"` + Capabilities capabilitiesList `yaml:"capabilities" json:"capabilities"` +} + +func (r *capabilitiesList) UnmarshalJSON(p []byte) error { + var tmpArray []json.RawMessage + + err := json.Unmarshal(p, &tmpArray) + if err != nil { + return err + } + + for i, t := range tmpArray { + mm := make(map[string]interface{}) + if err := json.Unmarshal(t, &mm); err != nil { + return err + } + + if _, found := mm["input"]; found { + cap := &inputCapability{} + if err := json.Unmarshal(t, &cap); err != nil { + return err + } + (*r) = append((*r), cap) + + } else if _, found = mm["output"]; found { + cap := &outputCapability{} + if err := json.Unmarshal(t, &cap); err != nil { + return err + } + (*r) = append((*r), cap) + + } else if _, found = mm["upgrade"]; found { + cap := &upgradeCapability{} + if err := json.Unmarshal(t, &cap); err != nil { + return err + } + (*r) = append((*r), cap) + } else { + return fmt.Errorf("unexpected capability type for definition number '%d'", i) + } + } + + return nil +} + +func (r *capabilitiesList) UnmarshalYAML(unmarshal func(interface{}) error) error { + var tmpArray []map[string]interface{} + + err := unmarshal(&tmpArray) + if err != nil { + return err + } + + for i, mm := range tmpArray { + partialYaml, err := yaml.Marshal(mm) + if err != nil { + return err + } + if _, found := mm["input"]; found { + cap := &inputCapability{} + if err := yaml.Unmarshal(partialYaml, &cap); err != nil { + return err + } + (*r) = append((*r), cap) + + } else if _, found = mm["output"]; found { + cap := &outputCapability{} + if err := yaml.Unmarshal(partialYaml, &cap); err != nil { + return err + } + (*r) = append((*r), cap) + + } else if _, found = mm["upgrade"]; found { + cap := &upgradeCapability{} + if err := yaml.Unmarshal(partialYaml, &cap); err != nil { + return err + } + (*r) = append((*r), cap) + } else { + return fmt.Errorf("unexpected capability type for definition number '%d'", i) + } + } + + return nil +} diff --git a/x-pack/elastic-agent/pkg/capabilities/rule_test.go b/x-pack/elastic-agent/pkg/capabilities/rule_test.go new file mode 100644 index 00000000000..5f3bab860bf --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/rule_test.go @@ -0,0 +1,122 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package capabilities + +import ( + "encoding/json" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v2" +) + +func TestUnmarshal(t *testing.T) { + t.Run("valid json", func(t *testing.T) { + rr := &ruleDefinitions{Capabilities: make([]ruler, 0, 0)} + + err := json.Unmarshal(jsonDefinitionValid, &rr) + + assert.Nil(t, err, "no error is expected") + assert.Equal(t, 3, len(rr.Capabilities)) + assert.Equal(t, "*capabilities.upgradeCapability", reflect.TypeOf(rr.Capabilities[0]).String()) + assert.Equal(t, "*capabilities.inputCapability", reflect.TypeOf(rr.Capabilities[1]).String()) + assert.Equal(t, "*capabilities.outputCapability", reflect.TypeOf(rr.Capabilities[2]).String()) + }) + + t.Run("invalid json", func(t *testing.T) { + var rr ruleDefinitions + + err := json.Unmarshal(jsonDefinitionInvalid, &rr) + + assert.Error(t, err, "error is expected") + }) + + t.Run("valid yaml", func(t *testing.T) { + rr := &ruleDefinitions{Capabilities: make([]ruler, 0, 0)} + + err := yaml.Unmarshal(yamlDefinitionValid, &rr) + + assert.Nil(t, err, "no error is expected") + assert.Equal(t, 3, len(rr.Capabilities)) + assert.Equal(t, "*capabilities.upgradeCapability", reflect.TypeOf(rr.Capabilities[0]).String()) + assert.Equal(t, "*capabilities.inputCapability", reflect.TypeOf(rr.Capabilities[1]).String()) + assert.Equal(t, "*capabilities.outputCapability", reflect.TypeOf(rr.Capabilities[2]).String()) + }) + + t.Run("invalid yaml", func(t *testing.T) { + var rr ruleDefinitions + + err := yaml.Unmarshal(yamlDefinitionInvalid, &rr) + + assert.Error(t, err, "error is expected") + }) +} + +var jsonDefinitionValid = []byte(`{ +"capabilities": [ + { + "upgrade": "${version} == '8.0.0'", + "rule": "allow" + }, + { + "input": "system/metrics", + "rule": "allow" + }, + { + "output": "elasticsearch", + "rule": "allow" + } +] +}`) + +var jsonDefinitionInvalid = []byte(`{ +"capabilities": [ + { + "upgrade": "${version} == '8.0.0'", + "rule": "allow" +}, +{ + "input": "system/metrics", + "rule": "allow" +}, +{ + "output": "elasticsearch", + "rule": "allow" +}, +{ + "ayay": "elasticsearch", + "rule": "allow" +} +] +}`) + +var yamlDefinitionValid = []byte(`capabilities: +- + rule: "allow" + upgrade: "${version} == '8.0.0'" +- + input: "system/metrics" + rule: "allow" +- + output: "elasticsearch" + rule: "allow" +`) + +var yamlDefinitionInvalid = []byte(` +capabilities: +- + rule: allow + upgrade: "${version} == '8.0.0'" +- + input: "system/metrics" + rule: allow +- + output: elasticsearch + rule: allow +- + ayay: elasticsearch + rule: allow +`) diff --git a/x-pack/elastic-agent/pkg/capabilities/testdata/allow_metrics-capabilities.yml b/x-pack/elastic-agent/pkg/capabilities/testdata/allow_metrics-capabilities.yml new file mode 100644 index 00000000000..408be363fc6 --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/testdata/allow_metrics-capabilities.yml @@ -0,0 +1,6 @@ +version: 0.1.0 +capabilities: +- rule: allow + input: system/metrics +- rule: deny + input: "*" diff --git a/x-pack/elastic-agent/pkg/capabilities/testdata/allow_metrics-config.yml b/x-pack/elastic-agent/pkg/capabilities/testdata/allow_metrics-config.yml new file mode 100644 index 00000000000..9658895c2af --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/testdata/allow_metrics-config.yml @@ -0,0 +1,20 @@ +outputs: + default: + type: elasticsearch + hosts: [127.0.0.1:9200] + username: elastic + password: changeme + +inputs: + - type: system/metrics + data_stream.namespace: default + use_output: default + streams: + - metricset: cpu + data_stream.dataset: system.cpu + - type: system/logs + data_stream.namespace: default + use_output: default + streams: + - paths: "/var/log/file1" + data_stream.dataset: system.var.log diff --git a/x-pack/elastic-agent/pkg/capabilities/testdata/allow_metrics-result.yml b/x-pack/elastic-agent/pkg/capabilities/testdata/allow_metrics-result.yml new file mode 100644 index 00000000000..cf9d4785608 --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/testdata/allow_metrics-result.yml @@ -0,0 +1,14 @@ +outputs: + default: + type: elasticsearch + hosts: [127.0.0.1:9200] + username: elastic + password: changeme + +inputs: + - type: system/metrics + data_stream.namespace: default + use_output: default + streams: + - metricset: cpu + data_stream.dataset: system.cpu diff --git a/x-pack/elastic-agent/pkg/capabilities/testdata/deny_logs-capabilities.yml b/x-pack/elastic-agent/pkg/capabilities/testdata/deny_logs-capabilities.yml new file mode 100644 index 00000000000..47722d814ed --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/testdata/deny_logs-capabilities.yml @@ -0,0 +1,4 @@ +version: 0.1.0 +capabilities: +- rule: deny + input: system/logs diff --git a/x-pack/elastic-agent/pkg/capabilities/testdata/deny_logs-config.yml b/x-pack/elastic-agent/pkg/capabilities/testdata/deny_logs-config.yml new file mode 100644 index 00000000000..9658895c2af --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/testdata/deny_logs-config.yml @@ -0,0 +1,20 @@ +outputs: + default: + type: elasticsearch + hosts: [127.0.0.1:9200] + username: elastic + password: changeme + +inputs: + - type: system/metrics + data_stream.namespace: default + use_output: default + streams: + - metricset: cpu + data_stream.dataset: system.cpu + - type: system/logs + data_stream.namespace: default + use_output: default + streams: + - paths: "/var/log/file1" + data_stream.dataset: system.var.log diff --git a/x-pack/elastic-agent/pkg/capabilities/testdata/deny_logs-result.yml b/x-pack/elastic-agent/pkg/capabilities/testdata/deny_logs-result.yml new file mode 100644 index 00000000000..cf9d4785608 --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/testdata/deny_logs-result.yml @@ -0,0 +1,14 @@ +outputs: + default: + type: elasticsearch + hosts: [127.0.0.1:9200] + username: elastic + password: changeme + +inputs: + - type: system/metrics + data_stream.namespace: default + use_output: default + streams: + - metricset: cpu + data_stream.dataset: system.cpu diff --git a/x-pack/elastic-agent/pkg/capabilities/testdata/filter_metrics-capabilities.yml b/x-pack/elastic-agent/pkg/capabilities/testdata/filter_metrics-capabilities.yml new file mode 100644 index 00000000000..a93e0ffcdb1 --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/testdata/filter_metrics-capabilities.yml @@ -0,0 +1,4 @@ +version: 0.1.0 +capabilities: +- rule: allow + input: system/metrics diff --git a/x-pack/elastic-agent/pkg/capabilities/testdata/filter_metrics-config.yml b/x-pack/elastic-agent/pkg/capabilities/testdata/filter_metrics-config.yml new file mode 100644 index 00000000000..9658895c2af --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/testdata/filter_metrics-config.yml @@ -0,0 +1,20 @@ +outputs: + default: + type: elasticsearch + hosts: [127.0.0.1:9200] + username: elastic + password: changeme + +inputs: + - type: system/metrics + data_stream.namespace: default + use_output: default + streams: + - metricset: cpu + data_stream.dataset: system.cpu + - type: system/logs + data_stream.namespace: default + use_output: default + streams: + - paths: "/var/log/file1" + data_stream.dataset: system.var.log diff --git a/x-pack/elastic-agent/pkg/capabilities/testdata/filter_metrics-result.yml b/x-pack/elastic-agent/pkg/capabilities/testdata/filter_metrics-result.yml new file mode 100644 index 00000000000..9658895c2af --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/testdata/filter_metrics-result.yml @@ -0,0 +1,20 @@ +outputs: + default: + type: elasticsearch + hosts: [127.0.0.1:9200] + username: elastic + password: changeme + +inputs: + - type: system/metrics + data_stream.namespace: default + use_output: default + streams: + - metricset: cpu + data_stream.dataset: system.cpu + - type: system/logs + data_stream.namespace: default + use_output: default + streams: + - paths: "/var/log/file1" + data_stream.dataset: system.var.log diff --git a/x-pack/elastic-agent/pkg/capabilities/testdata/invalid-capabilities.yml b/x-pack/elastic-agent/pkg/capabilities/testdata/invalid-capabilities.yml new file mode 100644 index 00000000000..408be363fc6 --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/testdata/invalid-capabilities.yml @@ -0,0 +1,6 @@ +version: 0.1.0 +capabilities: +- rule: allow + input: system/metrics +- rule: deny + input: "*" diff --git a/x-pack/elastic-agent/pkg/capabilities/testdata/invalid-config.yml b/x-pack/elastic-agent/pkg/capabilities/testdata/invalid-config.yml new file mode 100644 index 00000000000..2133ceac288 --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/testdata/invalid-config.yml @@ -0,0 +1,18 @@ +outputs: + default: + type: elasticsearch + hosts: [127.0.0.1:9200] + username: elastic + password: changeme + +inputs: + - data_stream.namespace: default + use_output: default + streams: + - metricset: cpu + data_stream.dataset: system.cpu + - data_stream.namespace: default + use_output: default + streams: + - paths: "/var/log/file1" + data_stream.dataset: system.var.log diff --git a/x-pack/elastic-agent/pkg/capabilities/testdata/invalid_output-capabilities.yml b/x-pack/elastic-agent/pkg/capabilities/testdata/invalid_output-capabilities.yml new file mode 100644 index 00000000000..13ebde75da0 --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/testdata/invalid_output-capabilities.yml @@ -0,0 +1,4 @@ +version: 0.1.0 +capabilities: +- rule: allow + output: kafka diff --git a/x-pack/elastic-agent/pkg/capabilities/testdata/invalid_output-config.yml b/x-pack/elastic-agent/pkg/capabilities/testdata/invalid_output-config.yml new file mode 100644 index 00000000000..e648faeae2f --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/testdata/invalid_output-config.yml @@ -0,0 +1,19 @@ +outputs: + default: + hosts: [127.0.0.1:9200] + username: elastic + password: changeme + +inputs: + - type: system/metrics + data_stream.namespace: default + use_output: default + streams: + - metricset: cpu + data_stream.dataset: system.cpu + - type: system/logs + data_stream.namespace: default + use_output: default + streams: + - paths: "/var/log/file1" + data_stream.dataset: system.var.log diff --git a/x-pack/elastic-agent/pkg/capabilities/testdata/no_caps-config.yml b/x-pack/elastic-agent/pkg/capabilities/testdata/no_caps-config.yml new file mode 100644 index 00000000000..9658895c2af --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/testdata/no_caps-config.yml @@ -0,0 +1,20 @@ +outputs: + default: + type: elasticsearch + hosts: [127.0.0.1:9200] + username: elastic + password: changeme + +inputs: + - type: system/metrics + data_stream.namespace: default + use_output: default + streams: + - metricset: cpu + data_stream.dataset: system.cpu + - type: system/logs + data_stream.namespace: default + use_output: default + streams: + - paths: "/var/log/file1" + data_stream.dataset: system.var.log diff --git a/x-pack/elastic-agent/pkg/capabilities/testdata/no_caps-result.yml b/x-pack/elastic-agent/pkg/capabilities/testdata/no_caps-result.yml new file mode 100644 index 00000000000..9658895c2af --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/testdata/no_caps-result.yml @@ -0,0 +1,20 @@ +outputs: + default: + type: elasticsearch + hosts: [127.0.0.1:9200] + username: elastic + password: changeme + +inputs: + - type: system/metrics + data_stream.namespace: default + use_output: default + streams: + - metricset: cpu + data_stream.dataset: system.cpu + - type: system/logs + data_stream.namespace: default + use_output: default + streams: + - paths: "/var/log/file1" + data_stream.dataset: system.var.log diff --git a/x-pack/elastic-agent/pkg/capabilities/upgrade.go b/x-pack/elastic-agent/pkg/capabilities/upgrade.go new file mode 100644 index 00000000000..4ca6f9074d4 --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/upgrade.go @@ -0,0 +1,191 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package capabilities + +import ( + "fmt" + "strings" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/eql" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" +) + +const ( + versionKey = "version" + sourceURIKey = "source_uri" +) + +// NewUpgradeCapability creates capability filter for upgrade. +// Available variables: +// - version +// - source_uri +func newUpgradesCapability(log *logger.Logger, rd *ruleDefinitions, reporter status.Reporter) (Capability, error) { + if rd == nil { + return &multiUpgradeCapability{caps: []*upgradeCapability{}}, nil + } + + caps := make([]*upgradeCapability, 0, len(rd.Capabilities)) + + for _, r := range rd.Capabilities { + c, err := newUpgradeCapability(log, r, reporter) + if err != nil { + return nil, err + } + + if c != nil { + caps = append(caps, c) + } + } + + return &multiUpgradeCapability{log: log, caps: caps}, nil +} + +func newUpgradeCapability(log *logger.Logger, r ruler, reporter status.Reporter) (*upgradeCapability, error) { + cap, ok := r.(*upgradeCapability) + if !ok { + return nil, nil + } + + cap.Type = strings.ToLower(cap.Type) + if cap.Type != allowKey && cap.Type != denyKey { + return nil, fmt.Errorf("'%s' is not a valid type 'allow' and 'deny' are supported", cap.Type) + } + + // if eql definition is not supported make a global rule + if len(cap.UpgradeEqlDefinition) == 0 { + cap.UpgradeEqlDefinition = "true" + } + + eqlExp, err := eql.New(cap.UpgradeEqlDefinition) + if err != nil { + return nil, err + } + + cap.upgradeEql = eqlExp + cap.log = log + cap.reporter = reporter + return cap, nil +} + +type upgradeCapability struct { + log *logger.Logger + reporter status.Reporter + Name string `json:"name,omitempty" yaml:"name,omitempty"` + Type string `json:"rule" yaml:"rule"` + // UpgradeEql is eql expression defining upgrade + UpgradeEqlDefinition string `json:"upgrade" yaml:"upgrade"` + + upgradeEql *eql.Expression +} + +func (c *upgradeCapability) Rule() string { + return c.Type +} + +func (c *upgradeCapability) name() string { + if c.Name != "" { + return c.Name + } + + t := "A" + if c.Type == denyKey { + t = "D" + } + + // e.g UA(*) or UD(7.*.*) + c.Name = fmt.Sprintf("U%s(%s)", t, c.UpgradeEqlDefinition) + return c.Name +} + +// Apply supports upgrade action or fleetapi upgrade action object. +func (c *upgradeCapability) Apply(upgradeMap map[string]interface{}) (map[string]interface{}, error) { + // if eql is not parsed or defined skip + if c.upgradeEql == nil { + return upgradeMap, nil + } + + // create VarStore out of map + varStore, err := transpiler.NewAST(upgradeMap) + if err != nil { + c.log.Errorf("failed creating a varStore for capability '%s': %v", c.name(), err) + return upgradeMap, nil + } + + isSupported, err := c.upgradeEql.Eval(varStore) + if err != nil { + c.log.Errorf("failed evaluating eql formula for capability '%s': %v", c.name(), err) + return upgradeMap, nil + } + + // if deny switch the logic + if c.Type == denyKey { + isSupported = !isSupported + c.log.Errorf("upgrade is blocked out due to capability restriction '%s'", c.name()) + c.reporter.Update(status.Degraded) + } + + if !isSupported { + return upgradeMap, ErrBlocked + } + + return upgradeMap, nil +} + +type multiUpgradeCapability struct { + log *logger.Logger + caps []*upgradeCapability +} + +func (c *multiUpgradeCapability) Apply(in interface{}) (interface{}, error) { + upgradeMap := upgradeObject(in) + if upgradeMap == nil { + c.log.Warnf("expecting map config object but got nil for capability 'multi-outputs'") + // not an upgrade we don't alter origin + return in, nil + } + + for _, cap := range c.caps { + // upgrade does not modify incoming action + _, err := cap.Apply(upgradeMap) + if err != nil { + return in, err + } + } + + return in, nil +} + +func upgradeObject(a interface{}) map[string]interface{} { + resultMap := make(map[string]interface{}) + if ua, ok := a.(upgradeAction); ok { + resultMap[versionKey] = ua.Version() + resultMap[sourceURIKey] = ua.SourceURI() + return resultMap + } + + if ua, ok := a.(*fleetapi.ActionUpgrade); ok { + resultMap[versionKey] = ua.Version + resultMap[sourceURIKey] = ua.SourceURI + return resultMap + } + + if ua, ok := a.(fleetapi.ActionUpgrade); ok { + resultMap[versionKey] = ua.Version + resultMap[sourceURIKey] = ua.SourceURI + return resultMap + } + + return nil +} + +type upgradeAction interface { + // Version to upgrade to. + Version() string + // SourceURI for download. + SourceURI() string +} diff --git a/x-pack/elastic-agent/pkg/capabilities/upgrade_test.go b/x-pack/elastic-agent/pkg/capabilities/upgrade_test.go new file mode 100644 index 00000000000..0dc82ed3507 --- /dev/null +++ b/x-pack/elastic-agent/pkg/capabilities/upgrade_test.go @@ -0,0 +1,344 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package capabilities + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" +) + +func TestUpgrade(t *testing.T) { + tr := &testReporter{} + l, _ := logger.New("test") + t.Run("invalid rule", func(t *testing.T) { + r := &inputCapability{} + cap, err := newUpgradeCapability(l, r, tr) + assert.NoError(t, err, "no error expected") + assert.Nil(t, cap, "cap should not be created") + }) + + t.Run("empty eql", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{ + &upgradeCapability{ + Type: "allow", + UpgradeEqlDefinition: "", + }, + }, + } + + cap, err := newUpgradesCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + }) + + t.Run("valid action - version match", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{ + &upgradeCapability{ + Type: "allow", + UpgradeEqlDefinition: "${version} == '8.0.0'", + }, + }, + } + cap, err := newUpgradesCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + ta := &testUpgradeAction{version: "8.0.0"} + outAfter, err := cap.Apply(ta) + + assert.NoError(t, err, "should not be failing") + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + assert.Equal(t, ta, outAfter) + }) + + t.Run("valid action - deny version match", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{ + &upgradeCapability{ + Type: "deny", + UpgradeEqlDefinition: "${version} == '8.0.0'", + }, + }, + } + + cap, err := newUpgradesCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + ta := &testUpgradeAction{version: "8.0.0"} + outAfter, err := cap.Apply(ta) + + assert.Error(t, err, "should fail") + assert.Equal(t, ErrBlocked, err, "should be blocking") + assert.Equal(t, ta, outAfter) + }) + + t.Run("valid action - deny version match", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{ + &upgradeCapability{ + Type: "deny", + UpgradeEqlDefinition: "${version} == '8.*.*'", + }, + }, + } + cap, err := newUpgradesCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + ta := &testUpgradeAction{version: "9.0.0"} + outAfter, err := cap.Apply(ta) + + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + assert.NoError(t, err, "should not fail") + assert.Equal(t, ta, outAfter) + }) + + t.Run("valid action - version mismmatch", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{ + &upgradeCapability{ + Type: "allow", + UpgradeEqlDefinition: "${version} == '7.12.0'", + }, + }, + } + cap, err := newUpgradesCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + ta := &testUpgradeAction{version: "8.0.0"} + outAfter, err := cap.Apply(ta) + + assert.Equal(t, ErrBlocked, err, "should be blocking") + assert.Error(t, err, "should fail") + assert.Equal(t, ta, outAfter) + }) + + t.Run("valid action - version bug allowed minor mismatch", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{ + &upgradeCapability{ + Type: "allow", + UpgradeEqlDefinition: "match(${version}, '8.0.*')", + }, + }, + } + cap, err := newUpgradesCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + ta := &testUpgradeAction{version: "8.1.0"} + outAfter, err := cap.Apply(ta) + + assert.Equal(t, ErrBlocked, err, "should be blocking") + assert.Error(t, err, "should fail") + assert.Equal(t, ta, outAfter) + }) + + t.Run("valid action - version minor allowed major mismatch", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{ + &upgradeCapability{ + Type: "allow", + UpgradeEqlDefinition: "match(${version}, '8.*.*')", + }, + }, + } + cap, err := newUpgradesCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + ta := &testUpgradeAction{version: "7.157.0"} + outAfter, err := cap.Apply(ta) + + assert.Equal(t, ErrBlocked, err, "should be blocking") + assert.Error(t, err, "should fail") + assert.Equal(t, ta, outAfter) + }) + + t.Run("valid action - version minor allowed minor upgrade", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{ + &upgradeCapability{ + Type: "allow", + UpgradeEqlDefinition: "match(${version}, '8.*.*')", + }, + }, + } + cap, err := newUpgradesCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + ta := &testUpgradeAction{version: "8.2.0"} + outAfter, err := cap.Apply(ta) + + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + assert.NoError(t, err, "should not fail") + assert.Equal(t, ta, outAfter) + }) + + t.Run("valid fleetatpi.action - version match", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{ + &upgradeCapability{ + Type: "allow", + UpgradeEqlDefinition: "match(${version}, '8.*.*')", + }, + }, + } + cap, err := newUpgradesCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + apiAction := fleetapi.ActionUpgrade{ + ActionID: "", + ActionType: "", + Version: "8.2.0", + SourceURI: "http://artifacts.elastic.co", + } + outAfter, err := cap.Apply(apiAction) + + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + assert.NoError(t, err, "should not fail") + assert.Equal(t, apiAction, outAfter, "action should not be altered") + }) + + t.Run("valid fleetatpi.action - version mismmatch", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{ + &upgradeCapability{ + Type: "allow", + UpgradeEqlDefinition: "match(${version}, '8.*.*')", + }, + }, + } + cap, err := newUpgradesCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + apiAction := &fleetapi.ActionUpgrade{ + Version: "9.0.0", + SourceURI: "http://artifacts.elastic.co", + } + outAfter, err := cap.Apply(apiAction) + + assert.Equal(t, ErrBlocked, err, "should be blocking") + assert.Error(t, err, "should fail") + assert.Equal(t, apiAction, outAfter, "action should not be altered") + }) + + t.Run("valid fleetatpi.action - version mismmatch", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{ + &upgradeCapability{ + Type: "allow", + UpgradeEqlDefinition: "match(${version}, '8.*.*')", + }, + }, + } + cap, err := newUpgradesCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + apiAction := fleetapi.ActionUpgrade{ + Version: "9.0.0", + SourceURI: "http://artifacts.elastic.co", + } + outAfter, err := cap.Apply(apiAction) + + assert.Equal(t, ErrBlocked, err, "should be blocking") + assert.Error(t, err, "should fail") + assert.Equal(t, apiAction, outAfter, "action should not be altered") + }) + + t.Run("valid action - source uri trusted", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{ + &upgradeCapability{ + Type: "allow", + UpgradeEqlDefinition: "startsWith(${source_uri}, 'https')", + }, + }, + } + cap, err := newUpgradesCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + apiAction := fleetapi.ActionUpgrade{ + Version: "9.0.0", + SourceURI: "https://artifacts.elastic.co", + } + outAfter, err := cap.Apply(apiAction) + + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + assert.NoError(t, err, "should not fail") + assert.Equal(t, apiAction, outAfter, "action should not be altered") + }) + + t.Run("valid action - source uri untrusted", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{ + &upgradeCapability{ + Type: "allow", + UpgradeEqlDefinition: "startsWith(${source_uri}, 'https')", + }, + }, + } + cap, err := newUpgradesCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + apiAction := fleetapi.ActionUpgrade{ + Version: "9.0.0", + SourceURI: "http://artifacts.elastic.co", + } + outAfter, err := cap.Apply(apiAction) + + assert.Equal(t, ErrBlocked, err, "should be blocking") + assert.Equal(t, apiAction, outAfter, "action should not be altered") + }) + + t.Run("unknown action", func(t *testing.T) { + rd := &ruleDefinitions{ + Capabilities: []ruler{ + &upgradeCapability{ + Type: "allow", + UpgradeEqlDefinition: "startsWith(${source_uri}, 'https')", + }, + }, + } + cap, err := newUpgradesCapability(l, rd, tr) + assert.NoError(t, err, "error not expected, provided eql is valid") + assert.NotNil(t, cap, "cap should be created") + + apiAction := fleetapi.ActionPolicyChange{} + outAfter, err := cap.Apply(apiAction) + + assert.NotEqual(t, ErrBlocked, err, "should not be blocking") + assert.NoError(t, err, "should not fail") + assert.Equal(t, apiAction, outAfter, "action should not be altered") + }) +} + +type testUpgradeAction struct { + version string +} + +// Version to upgrade to. +func (a *testUpgradeAction) Version() string { + return a.version +} + +// SourceURI for download. +func (a *testUpgradeAction) SourceURI() string { + return "http://artifacts.elastic.co" +} diff --git a/x-pack/elastic-agent/pkg/core/status/reporter.go b/x-pack/elastic-agent/pkg/core/status/reporter.go index 9c061b06e00..2d3517a434f 100644 --- a/x-pack/elastic-agent/pkg/core/status/reporter.go +++ b/x-pack/elastic-agent/pkg/core/status/reporter.go @@ -38,6 +38,7 @@ var ( // Controller takes track of component statuses. type Controller interface { Register(string) Reporter + RegisterWithPersistance(string, bool) Reporter Status() AgentStatus StatusString() string UpdateStateID(string) @@ -77,7 +78,9 @@ func (r *controller) UpdateStateID(stateID string) { } rep.lock.Lock() - rep.status = Healthy + if !rep.isPersistent { + rep.status = Healthy + } rep.lock.Unlock() } r.lock.Unlock() @@ -87,6 +90,10 @@ func (r *controller) UpdateStateID(stateID string) { // Register registers new component for status updates. func (r *controller) Register(componentIdentifier string) Reporter { + return r.RegisterWithPersistance(componentIdentifier, false) +} + +func (r *controller) RegisterWithPersistance(componentIdentifier string, persistent bool) Reporter { id := componentIdentifier + "-" + uuid.New().String()[:8] rep := &reporter{ isRegistered: true, @@ -96,6 +103,7 @@ func (r *controller) Register(componentIdentifier string) Reporter { r.lock.Unlock() }, notifyChangeFunc: r.updateStatus, + isPersistent: persistent, } r.lock.Lock() @@ -159,6 +167,7 @@ type Reporter interface { type reporter struct { lock sync.Mutex + isPersistent bool isRegistered bool status AgentStatus unregisterFunc func()