From 248b1e923e145dfd390e0e8e1efe7badab8802ee Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Tue, 7 Jul 2020 13:23:59 +0200 Subject: [PATCH] [Ingest Manager] Agent unenroll (#19507) --- go.sum | 3 + .../pkg/agent/application/action_store.go | 43 ++++++++++---- .../application/handler_action_unenroll.go | 56 +++++++++++++++++++ .../pkg/agent/application/local_mode.go | 1 + .../pkg/agent/application/managed_mode.go | 31 +++++++++- .../agent/application/managed_mode_test.go | 7 ++- .../pkg/agent/application/router.go | 9 ++- .../pkg/agent/application/router_test.go | 3 +- .../pkg/agent/application/stream.go | 5 +- .../config_request.go | 11 +++- .../config_request_test.go | 2 +- .../pkg/agent/configrequest/request.go | 2 + .../pkg/agent/operation/monitoring.go | 2 +- .../pkg/agent/operation/monitoring_test.go | 3 + .../pkg/agent/operation/operator.go | 7 +++ .../pkg/agent/stateresolver/resolve_test.go | 12 ++++ .../core/monitoring/beats/beats_monitor.go | 7 +++ .../pkg/core/monitoring/monitor.go | 1 + .../pkg/core/monitoring/noop/noop_monitor.go | 3 + x-pack/elastic-agent/pkg/fleetapi/action.go | 30 ++++++++++ 20 files changed, 213 insertions(+), 25 deletions(-) create mode 100644 x-pack/elastic-agent/pkg/agent/application/handler_action_unenroll.go rename x-pack/elastic-agent/pkg/agent/{application => configrequest}/config_request.go (83%) rename x-pack/elastic-agent/pkg/agent/{application => configrequest}/config_request_test.go (97%) diff --git a/go.sum b/go.sum index ef0fde2efdb..4b16629b1f5 100644 --- a/go.sum +++ b/go.sum @@ -99,8 +99,10 @@ github.com/aerospike/aerospike-client-go v1.27.1-0.20170612174108-0f3b54da6bdc/g github.com/akavel/rsrc v0.8.0 h1:zjWn7ukO9Kc5Q62DOJCcxGpXC18RawVtYAGdz2aLlfw= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20 h1:7rj9qZ63knnVo2ZeepYHvHuRdG76f3tRUTdIQDzRBeI= github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20/go.mod h1:cI59GRkC2FRaFYtgbYEqMlgnnfvAwXzjojyZKXwklNg= @@ -888,6 +890,7 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/x-pack/elastic-agent/pkg/agent/application/action_store.go b/x-pack/elastic-agent/pkg/agent/application/action_store.go index 2da8d3c0a8f..5e87706638a 100644 --- a/x-pack/elastic-agent/pkg/agent/application/action_store.go +++ b/x-pack/elastic-agent/pkg/agent/application/action_store.go @@ -7,6 +7,7 @@ package application import ( "context" "fmt" + "io" yaml "gopkg.in/yaml.v2" @@ -53,7 +54,7 @@ func newActionStore(log *logger.Logger, store storeLoad) (*actionStore, error) { // any other type of action will be silently ignored. func (s *actionStore) Add(a action) { switch v := a.(type) { - case *fleetapi.ActionConfigChange: + case *fleetapi.ActionConfigChange, *fleetapi.ActionUnenroll: // Only persist the action if the action is different. if s.action != nil && s.action.ID() == v.ID() { return @@ -69,16 +70,29 @@ func (s *actionStore) Save() error { return nil } - apc, ok := s.action.(*fleetapi.ActionConfigChange) - if !ok { - return fmt.Errorf("incompatible type, expected ActionPolicyChange and received %T", s.action) - } + var reader io.Reader + if apc, ok := s.action.(*fleetapi.ActionConfigChange); ok { + serialize := actionConfigChangeSerializer(*apc) - serialize := actionConfigChangeSerializer(*apc) + r, err := yamlToReader(&serialize) + if err != nil { + return err + } - reader, err := yamlToReader(&serialize) - if err != nil { - return err + reader = r + } else if aun, ok := s.action.(*fleetapi.ActionUnenroll); ok { + serialize := actionUnenrollSerializer(*aun) + + r, err := yamlToReader(&serialize) + if err != nil { + return err + } + + reader = r + } + + if reader == nil { + return fmt.Errorf("incompatible type, expected ActionPolicyChange and received %T", s.action) } if err := s.store.Save(reader); err != nil { @@ -98,7 +112,7 @@ func (s *actionStore) Actions() []action { return []action{s.action} } -// actionConfigChangeSerializer is a struct that add YAML serialization, I don't think serialization +// actionConfigChangeSerializer is a struct that adds a YAML serialization, I don't think serialization // is a concern of the fleetapi package. I went this route so I don't have to do much refactoring. // // There are four ways to achieve the same results: @@ -117,6 +131,15 @@ type actionConfigChangeSerializer struct { // Add a guards between the serializer structs and the original struct. var _ actionConfigChangeSerializer = actionConfigChangeSerializer(fleetapi.ActionConfigChange{}) +// actionUnenrollSerializer is a struct that adds a YAML serialization, +type actionUnenrollSerializer struct { + ActionID string `yaml:"action_id"` + ActionType string `yaml:"action_type"` +} + +// Add a guards between the serializer structs and the original struct. +var _ actionUnenrollSerializer = actionUnenrollSerializer(fleetapi.ActionUnenroll{}) + // actionStoreAcker wraps an existing acker and will send any acked event to the action store, // its up to the action store to decide if we need to persist the event for future replay or just // discard the event. diff --git a/x-pack/elastic-agent/pkg/agent/application/handler_action_unenroll.go b/x-pack/elastic-agent/pkg/agent/application/handler_action_unenroll.go new file mode 100644 index 00000000000..5ef04947be9 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/application/handler_action_unenroll.go @@ -0,0 +1,56 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "context" + "fmt" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" +) + +// After running Unenroll agent is in idle state, non managed non standalone. +// For it to be operational again it needs to be either enrolled or reconfigured. +type handlerUnenroll struct { + log *logger.Logger + emitter emitterFunc + dispatcher programsDispatcher + closers []context.CancelFunc +} + +func (h *handlerUnenroll) Handle(ctx context.Context, a action, acker fleetAcker) error { + h.log.Debugf("handlerUnenroll: action '%+v' received", a) + action, ok := a.(*fleetapi.ActionUnenroll) + if !ok { + return fmt.Errorf("invalid type, expected ActionUnenroll and received %T", a) + } + + // Providing empty map will close all pipelines + noPrograms := make(map[routingKey][]program.Program) + h.dispatcher.Dispatch(a.ID(), noPrograms) + + if err := acker.Ack(ctx, action); err != nil { + return err + } + + // commit all acks before quitting. + if err := acker.Commit(ctx); err != nil { + return err + } + + // close fleet gateway loop + for _, c := range h.closers { + c() + } + + // clean action store + // if err := os.Remove(info.AgentActionStoreFile()); err != nil && !os.IsNotExist(err) { + // return errors.New(err, "failed to clear action store") + // } + + return nil +} diff --git a/x-pack/elastic-agent/pkg/agent/application/local_mode.go b/x-pack/elastic-agent/pkg/agent/application/local_mode.go index 6c459931534..db6bcc60738 100644 --- a/x-pack/elastic-agent/pkg/agent/application/local_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/local_mode.go @@ -26,6 +26,7 @@ type emitterFunc func(*config.Config) error // ConfigHandler is capable of handling config, perform actions at it, shutdown any long running process. type ConfigHandler interface { HandleConfig(configrequest.Request) error + Close() error Shutdown() } diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go index 1d9efc5683d..77f787642aa 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go @@ -48,6 +48,7 @@ type Managed struct { gateway *fleetGateway router *router srv *server.Server + as *actionStore } func newManaged( @@ -164,6 +165,7 @@ func newManaged( if err != nil { return nil, errors.New(err, fmt.Sprintf("fail to read action store '%s'", info.AgentActionStoreFile())) } + managedApplication.as = actionStore actionAcker := newActionStoreAcker(batchedAcker, actionStore) actionDispatcher, err := newActionDispatcher(managedApplication.bgContext, log, &handlerDefault{log: log}) @@ -179,13 +181,24 @@ func newManaged( }, ) + actionDispatcher.MustRegister( + &fleetapi.ActionUnenroll{}, + &handlerUnenroll{ + log: log, + emitter: emit, + dispatcher: router, + closers: []context.CancelFunc{managedApplication.cancelCtxFn}, + }, + ) + actionDispatcher.MustRegister( &fleetapi.ActionUnknown{}, &handlerUnknown{log: log}, ) actions := actionStore.Actions() - if len(actions) > 0 { + + if len(actions) > 0 && !managedApplication.wasUnenrolled() { // TODO(ph) We will need an improvement on fleet, if there is an error while dispatching a // persisted action on disk we should be able to ask Fleet to get the latest configuration. // But at the moment this is not possible because the policy change was acked. @@ -215,6 +228,11 @@ func newManaged( // Start starts a managed elastic-agent. func (m *Managed) Start() error { m.log.Info("Agent is starting") + if m.wasUnenrolled() { + m.log.Warnf("agent was previously unenrolled. To reactivate please reconfigure or enroll again.") + return nil + } + m.gateway.Start() return nil } @@ -232,3 +250,14 @@ func (m *Managed) Stop() error { func (m *Managed) AgentInfo() *info.AgentInfo { return m.agentInfo } + +func (m *Managed) wasUnenrolled() bool { + actions := m.as.Actions() + for _, a := range actions { + if a.Type() == "UNENROLL" { + return true + } + } + + return false +} diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go index ddef3bc3600..f614bb5afff 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" ) @@ -68,16 +69,16 @@ func testActions() ([]action, error) { } type mockStreamStore struct { - store []*configRequest + store []configrequest.Request } func newMockStreamStore() *mockStreamStore { return &mockStreamStore{ - store: make([]*configRequest, 0), + store: make([]configrequest.Request, 0), } } -func (m *mockStreamStore) Execute(cr *configRequest) error { +func (m *mockStreamStore) Execute(cr configrequest.Request) error { m.store = append(m.store, cr) return nil } diff --git a/x-pack/elastic-agent/pkg/agent/application/router.go b/x-pack/elastic-agent/pkg/agent/application/router.go index 2e3eb0d937b..3da840a2a3d 100644 --- a/x-pack/elastic-agent/pkg/agent/application/router.go +++ b/x-pack/elastic-agent/pkg/agent/application/router.go @@ -7,7 +7,9 @@ package application import ( "fmt" "strings" + "time" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/sorted" @@ -19,7 +21,7 @@ var defautlRK = "DEFAULT" type routingKey = string type stream interface { - Execute(*configRequest) error + Execute(configrequest.Request) error Close() error Shutdown() } @@ -73,10 +75,7 @@ func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) e return fmt.Errorf("could not find programs for routing key %s", rk) } - req := &configRequest{ - id: id, - programs: programs.([]program.Program), - } + req := configrequest.New(id, time.Now(), programs.([]program.Program)) r.log.Debugf( "Streams %s need to run config with ID %s and programs: %s", diff --git a/x-pack/elastic-agent/pkg/agent/application/router_test.go b/x-pack/elastic-agent/pkg/agent/application/router_test.go index 4054692281b..2de86d0c769 100644 --- a/x-pack/elastic-agent/pkg/agent/application/router_test.go +++ b/x-pack/elastic-agent/pkg/agent/application/router_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" ) @@ -199,7 +200,7 @@ func newMockStream(rk routingKey, notify notifyFunc) *mockStream { } } -func (m *mockStream) Execute(req *configRequest) error { +func (m *mockStream) Execute(req configrequest.Request) error { m.event(executeOp, req) return nil } diff --git a/x-pack/elastic-agent/pkg/agent/application/stream.go b/x-pack/elastic-agent/pkg/agent/application/stream.go index bfe015c4661..079af6d64b5 100644 --- a/x-pack/elastic-agent/pkg/agent/application/stream.go +++ b/x-pack/elastic-agent/pkg/agent/application/stream.go @@ -7,6 +7,7 @@ package application import ( "context" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation" operatorCfg "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config" @@ -28,10 +29,10 @@ type operatorStream struct { } func (b *operatorStream) Close() error { - return b.configHandler.HandleConfig(&configRequest{}) + return b.configHandler.Close() } -func (b *operatorStream) Execute(cfg *configRequest) error { +func (b *operatorStream) Execute(cfg configrequest.Request) error { return b.configHandler.HandleConfig(cfg) } diff --git a/x-pack/elastic-agent/pkg/agent/application/config_request.go b/x-pack/elastic-agent/pkg/agent/configrequest/config_request.go similarity index 83% rename from x-pack/elastic-agent/pkg/agent/application/config_request.go rename to x-pack/elastic-agent/pkg/agent/configrequest/config_request.go index ae3c067429f..1f08d95de2f 100644 --- a/x-pack/elastic-agent/pkg/agent/application/config_request.go +++ b/x-pack/elastic-agent/pkg/agent/configrequest/config_request.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package application +package configrequest import ( "strings" @@ -19,6 +19,15 @@ type configRequest struct { programs []program.Program } +// New created a new Request. +func New(id string, createdAt time.Time, programs []program.Program) Request { + return &configRequest{ + id: id, + createdAt: createdAt, + programs: programs, + } +} + func (c *configRequest) String() string { names := c.ProgramNames() return "[" + c.ShortID() + "] Config: " + strings.Join(names, ", ") diff --git a/x-pack/elastic-agent/pkg/agent/application/config_request_test.go b/x-pack/elastic-agent/pkg/agent/configrequest/config_request_test.go similarity index 97% rename from x-pack/elastic-agent/pkg/agent/application/config_request_test.go rename to x-pack/elastic-agent/pkg/agent/configrequest/config_request_test.go index f0140aa8ffd..eb294d73650 100644 --- a/x-pack/elastic-agent/pkg/agent/application/config_request_test.go +++ b/x-pack/elastic-agent/pkg/agent/configrequest/config_request_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package application +package configrequest import ( "testing" diff --git a/x-pack/elastic-agent/pkg/agent/configrequest/request.go b/x-pack/elastic-agent/pkg/agent/configrequest/request.go index b3cb3063ecc..d5ac8759677 100644 --- a/x-pack/elastic-agent/pkg/agent/configrequest/request.go +++ b/x-pack/elastic-agent/pkg/agent/configrequest/request.go @@ -13,6 +13,8 @@ import ( // Request is the minimal interface a config request must have. type Request interface { ID() string + ShortID() string CreatedAt() time.Time Programs() []program.Program + ProgramNames() []string } diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go index 10947f4d7df..034e47be64a 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go @@ -65,7 +65,7 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) { } func (o *Operator) handleStopSidecar(s configrequest.Step) (result error) { - for _, step := range o.getMonitoringSteps(s) { + for _, step := range o.generateMonitoringSteps(s.Version, nil) { p, _, err := getProgramFromStepWithTags(step, o.config.DownloadConfig, monitoringTags()) if err != nil { return errors.New(err, diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go index 82df2a402e9..921c5b5e93c 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go @@ -175,6 +175,9 @@ func (b *testMonitor) EnrichArgs(_ string, _ string, args []string, _ bool) []st // Cleanup cleans up all drops. func (b *testMonitor) Cleanup(string, string) error { return nil } +// Close closes the monitor. +func (b *testMonitor) Close() {} + // Prepare executes steps in order for monitoring to work correctly func (b *testMonitor) Prepare(string, string, int, int) error { return nil } diff --git a/x-pack/elastic-agent/pkg/agent/operation/operator.go b/x-pack/elastic-agent/pkg/agent/operation/operator.go index 1790b03a5a0..cb2583860eb 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operator.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operator.go @@ -10,6 +10,7 @@ import ( "os" "strings" "sync" + "time" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" @@ -126,6 +127,12 @@ func (o *Operator) State() map[string]state.State { return result } +// Close stops all programs handled by operator and clears state +func (o *Operator) Close() error { + o.monitor.Close() + return o.HandleConfig(configrequest.New("", time.Now(), nil)) +} + // HandleConfig handles configuration for a pipeline and performs actions to achieve this configuration. func (o *Operator) HandleConfig(cfg configrequest.Request) error { _, steps, ack, err := o.stateResolver.Resolve(cfg) diff --git a/x-pack/elastic-agent/pkg/agent/stateresolver/resolve_test.go b/x-pack/elastic-agent/pkg/agent/stateresolver/resolve_test.go index e6bf53a0ad4..b284ee51b33 100644 --- a/x-pack/elastic-agent/pkg/agent/stateresolver/resolve_test.go +++ b/x-pack/elastic-agent/pkg/agent/stateresolver/resolve_test.go @@ -346,6 +346,10 @@ func (c *cfg) ID() string { return c.id } +func (c *cfg) ShortID() string { + return c.id +} + func (c *cfg) Programs() []program.Program { return c.programs } @@ -354,6 +358,14 @@ func (c *cfg) CreatedAt() time.Time { return c.createdAt } +func (c *cfg) ProgramNames() []string { + names := make([]string, 0, len(c.programs)) + for _, name := range c.programs { + names = append(names, name.Spec.Name) + } + return names +} + func p(identifier, checksum string) program.Program { s, ok := program.FindSpecByName(identifier) if !ok { diff --git a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go index 93a5b8ba549..ac7bf0e7aca 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go @@ -66,6 +66,13 @@ func (b *Monitor) Reload(rawConfig *config.Config) error { return nil } +// Close disables monitoring +func (b *Monitor) Close() { + b.config.Enabled = false + b.config.MonitorMetrics = false + b.config.MonitorLogs = false +} + // IsMonitoringEnabled returns true if monitoring is enabled. func (b *Monitor) IsMonitoringEnabled() bool { return b.config.Enabled } diff --git a/x-pack/elastic-agent/pkg/core/monitoring/monitor.go b/x-pack/elastic-agent/pkg/core/monitoring/monitor.go index 0767c28055a..409938a4d4f 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/monitor.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/monitor.go @@ -25,6 +25,7 @@ type Monitor interface { IsMonitoringEnabled() bool WatchLogs() bool WatchMetrics() bool + Close() } type wrappedConfig struct { diff --git a/x-pack/elastic-agent/pkg/core/monitoring/noop/noop_monitor.go b/x-pack/elastic-agent/pkg/core/monitoring/noop/noop_monitor.go index 97582de7242..ae573d591b9 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/noop/noop_monitor.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/noop/noop_monitor.go @@ -27,6 +27,9 @@ func (b *Monitor) Cleanup(string, string) error { return nil } +// Close closes the monitor. +func (b *Monitor) Close() {} + // Prepare executes steps in order for monitoring to work correctly func (b *Monitor) Prepare(string, string, int, int) error { return nil diff --git a/x-pack/elastic-agent/pkg/fleetapi/action.go b/x-pack/elastic-agent/pkg/fleetapi/action.go index bf59bc22e1a..83d53eb9d06 100644 --- a/x-pack/elastic-agent/pkg/fleetapi/action.go +++ b/x-pack/elastic-agent/pkg/fleetapi/action.go @@ -83,6 +83,31 @@ func (a *ActionConfigChange) ID() string { return a.ActionID } +// ActionUnenroll is a request for agent to unhook from fleet. +type ActionUnenroll struct { + ActionID string + ActionType string +} + +func (a *ActionUnenroll) String() string { + var s strings.Builder + s.WriteString("action_id: ") + s.WriteString(a.ActionID) + s.WriteString(", type: ") + s.WriteString(a.ActionType) + return s.String() +} + +// Type returns the type of the Action. +func (a *ActionUnenroll) Type() string { + return a.ActionType +} + +// ID returns the ID of the Action. +func (a *ActionUnenroll) ID() string { + return a.ActionID +} + // Actions is a list of Actions to executes and allow to unmarshal heterogenous action type. type Actions []Action @@ -117,6 +142,11 @@ func (a *Actions) UnmarshalJSON(data []byte) error { "fail to decode CONFIG_CHANGE action", errors.TypeConfig) } + case "UNENROLL": + action = &ActionUnenroll{ + ActionID: response.ActionID, + ActionType: response.ActionType, + } default: action = &ActionUnknown{ ActionID: response.ActionID,