Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest Manager] Agent unenroll #19507

Merged
merged 9 commits into from
Jul 7, 2020
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ github.com/aerospike/aerospike-client-go v1.27.1-0.20170612174108-0f3b54da6bdc/g
github.com/akavel/rsrc v0.8.0 h1:zjWn7ukO9Kc5Q62DOJCcxGpXC18RawVtYAGdz2aLlfw=
github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20 h1:7rj9qZ63knnVo2ZeepYHvHuRdG76f3tRUTdIQDzRBeI=
github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20/go.mod h1:cI59GRkC2FRaFYtgbYEqMlgnnfvAwXzjojyZKXwklNg=
Expand Down Expand Up @@ -888,6 +890,7 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
43 changes: 33 additions & 10 deletions x-pack/elastic-agent/pkg/agent/application/action_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package application
import (
"context"
"fmt"
"io"

yaml "gopkg.in/yaml.v2"

Expand Down Expand Up @@ -53,7 +54,7 @@ func newActionStore(log *logger.Logger, store storeLoad) (*actionStore, error) {
// any other type of action will be silently ignored.
func (s *actionStore) Add(a action) {
switch v := a.(type) {
case *fleetapi.ActionConfigChange:
case *fleetapi.ActionConfigChange, *fleetapi.ActionUnenroll:
// Only persist the action if the action is different.
if s.action != nil && s.action.ID() == v.ID() {
return
Expand All @@ -69,16 +70,29 @@ func (s *actionStore) Save() error {
return nil
}

apc, ok := s.action.(*fleetapi.ActionConfigChange)
if !ok {
return fmt.Errorf("incompatible type, expected ActionPolicyChange and received %T", s.action)
}
var reader io.Reader
if apc, ok := s.action.(*fleetapi.ActionConfigChange); ok {
serialize := actionConfigChangeSerializer(*apc)

serialize := actionConfigChangeSerializer(*apc)
r, err := yamlToReader(&serialize)
if err != nil {
return err
}

reader, err := yamlToReader(&serialize)
if err != nil {
return err
reader = r
} else if aun, ok := s.action.(*fleetapi.ActionUnenroll); ok {
serialize := actionUnenrollSerializer(*aun)

r, err := yamlToReader(&serialize)
if err != nil {
return err
}

reader = r
}

if reader == nil {
return fmt.Errorf("incompatible type, expected ActionPolicyChange and received %T", s.action)
}

if err := s.store.Save(reader); err != nil {
Expand All @@ -98,7 +112,7 @@ func (s *actionStore) Actions() []action {
return []action{s.action}
}

// actionConfigChangeSerializer is a struct that add YAML serialization, I don't think serialization
// actionConfigChangeSerializer is a struct that adds a YAML serialization, I don't think serialization
// is a concern of the fleetapi package. I went this route so I don't have to do much refactoring.
//
// There are four ways to achieve the same results:
Expand All @@ -117,6 +131,15 @@ type actionConfigChangeSerializer struct {
// Add a guards between the serializer structs and the original struct.
var _ actionConfigChangeSerializer = actionConfigChangeSerializer(fleetapi.ActionConfigChange{})

// actionUnenrollSerializer is a struct that adds a YAML serialization,
type actionUnenrollSerializer struct {
ActionID string `yaml:"action_id"`
ActionType string `yaml:"action_type"`
}

// Add a guards between the serializer structs and the original struct.
var _ actionUnenrollSerializer = actionUnenrollSerializer(fleetapi.ActionUnenroll{})

// actionStoreAcker wraps an existing acker and will send any acked event to the action store,
// its up to the action store to decide if we need to persist the event for future replay or just
// discard the event.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application

import (
"context"
"fmt"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
)

// After running Unenroll agent is in idle state, non managed non standalone.
// For it to be operational again it needs to be either enrolled or reconfigured.
type handlerUnenroll struct {
log *logger.Logger
emitter emitterFunc
dispatcher programsDispatcher
closers []context.CancelFunc
}

func (h *handlerUnenroll) Handle(ctx context.Context, a action, acker fleetAcker) error {
h.log.Debugf("handlerUnenroll: action '%+v' received", a)
action, ok := a.(*fleetapi.ActionUnenroll)
if !ok {
return fmt.Errorf("invalid type, expected ActionUnenroll and received %T", a)
}

// Providing empty map will close all pipelines
noPrograms := make(map[routingKey][]program.Program)
h.dispatcher.Dispatch(a.ID(), noPrograms)

if err := acker.Ack(ctx, action); err != nil {
return err
}

// commit all acks before quitting.
if err := acker.Commit(ctx); err != nil {
return err
}

// close fleet gateway loop
for _, c := range h.closers {
c()
}

// clean action store
// if err := os.Remove(info.AgentActionStoreFile()); err != nil && !os.IsNotExist(err) {
// return errors.New(err, "failed to clear action store")
// }

return nil
}
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type emitterFunc func(*config.Config) error
// ConfigHandler is capable of handling config, perform actions at it, shutdown any long running process.
type ConfigHandler interface {
HandleConfig(configrequest.Request) error
Close() error
Shutdown()
}

Expand Down
31 changes: 30 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Managed struct {
gateway *fleetGateway
router *router
srv *server.Server
as *actionStore
}

func newManaged(
Expand Down Expand Up @@ -164,6 +165,7 @@ func newManaged(
if err != nil {
return nil, errors.New(err, fmt.Sprintf("fail to read action store '%s'", info.AgentActionStoreFile()))
}
managedApplication.as = actionStore
actionAcker := newActionStoreAcker(batchedAcker, actionStore)

actionDispatcher, err := newActionDispatcher(managedApplication.bgContext, log, &handlerDefault{log: log})
Expand All @@ -179,13 +181,24 @@ func newManaged(
},
)

actionDispatcher.MustRegister(
&fleetapi.ActionUnenroll{},
&handlerUnenroll{
log: log,
emitter: emit,
dispatcher: router,
closers: []context.CancelFunc{managedApplication.cancelCtxFn},
},
)

actionDispatcher.MustRegister(
&fleetapi.ActionUnknown{},
&handlerUnknown{log: log},
)

actions := actionStore.Actions()
if len(actions) > 0 {

if len(actions) > 0 && !managedApplication.wasUnenrolled() {
// TODO(ph) We will need an improvement on fleet, if there is an error while dispatching a
// persisted action on disk we should be able to ask Fleet to get the latest configuration.
// But at the moment this is not possible because the policy change was acked.
Expand Down Expand Up @@ -215,6 +228,11 @@ func newManaged(
// Start starts a managed elastic-agent.
func (m *Managed) Start() error {
m.log.Info("Agent is starting")
if m.wasUnenrolled() {
m.log.Warnf("agent was previously unenrolled. To reactivate please reconfigure or enroll again.")
return nil
}

m.gateway.Start()
return nil
}
Expand All @@ -232,3 +250,14 @@ func (m *Managed) Stop() error {
func (m *Managed) AgentInfo() *info.AgentInfo {
return m.agentInfo
}

func (m *Managed) wasUnenrolled() bool {
actions := m.as.Actions()
for _, a := range actions {
if a.Type() == "UNENROLL" {
return true
}
}

return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
)
Expand Down Expand Up @@ -68,16 +69,16 @@ func testActions() ([]action, error) {
}

type mockStreamStore struct {
store []*configRequest
store []configrequest.Request
}

func newMockStreamStore() *mockStreamStore {
return &mockStreamStore{
store: make([]*configRequest, 0),
store: make([]configrequest.Request, 0),
}
}

func (m *mockStreamStore) Execute(cr *configRequest) error {
func (m *mockStreamStore) Execute(cr configrequest.Request) error {
m.store = append(m.store, cr)
return nil
}
Expand Down
9 changes: 4 additions & 5 deletions x-pack/elastic-agent/pkg/agent/application/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ package application
import (
"fmt"
"strings"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/sorted"
Expand All @@ -19,7 +21,7 @@ var defautlRK = "DEFAULT"
type routingKey = string

type stream interface {
Execute(*configRequest) error
Execute(configrequest.Request) error
Close() error
Shutdown()
}
Expand Down Expand Up @@ -73,10 +75,7 @@ func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) e
return fmt.Errorf("could not find programs for routing key %s", rk)
}

req := &configRequest{
id: id,
programs: programs.([]program.Program),
}
req := configrequest.New(id, time.Now(), programs.([]program.Program))

r.log.Debugf(
"Streams %s need to run config with ID %s and programs: %s",
Expand Down
3 changes: 2 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)
Expand Down Expand Up @@ -199,7 +200,7 @@ func newMockStream(rk routingKey, notify notifyFunc) *mockStream {
}
}

func (m *mockStream) Execute(req *configRequest) error {
func (m *mockStream) Execute(req configrequest.Request) error {
m.event(executeOp, req)
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package application
import (
"context"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation"
operatorCfg "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config"
Expand All @@ -28,10 +29,10 @@ type operatorStream struct {
}

func (b *operatorStream) Close() error {
return b.configHandler.HandleConfig(&configRequest{})
return b.configHandler.Close()
}

func (b *operatorStream) Execute(cfg *configRequest) error {
func (b *operatorStream) Execute(cfg configrequest.Request) error {
return b.configHandler.HandleConfig(cfg)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application
package configrequest

import (
"strings"
Expand All @@ -19,6 +19,15 @@ type configRequest struct {
programs []program.Program
}

// New created a new Request.
func New(id string, createdAt time.Time, programs []program.Program) Request {
return &configRequest{
id: id,
createdAt: createdAt,
programs: programs,
}
}

func (c *configRequest) String() string {
names := c.ProgramNames()
return "[" + c.ShortID() + "] Config: " + strings.Join(names, ", ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application
package configrequest

import (
"testing"
Expand Down
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/pkg/agent/configrequest/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
// Request is the minimal interface a config request must have.
type Request interface {
ID() string
ShortID() string
CreatedAt() time.Time
Programs() []program.Program
ProgramNames() []string
}
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) {
}

func (o *Operator) handleStopSidecar(s configrequest.Step) (result error) {
for _, step := range o.getMonitoringSteps(s) {
for _, step := range o.generateMonitoringSteps(s.Version, nil) {
p, _, err := getProgramFromStepWithTags(step, o.config.DownloadConfig, monitoringTags())
if err != nil {
return errors.New(err,
Expand Down
3 changes: 3 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ func (b *testMonitor) EnrichArgs(_ string, _ string, args []string, _ bool) []st
// Cleanup cleans up all drops.
func (b *testMonitor) Cleanup(string, string) error { return nil }

// Close closes the monitor.
func (b *testMonitor) Close() {}

// Prepare executes steps in order for monitoring to work correctly
func (b *testMonitor) Prepare(string, string, int, int) error { return nil }

Expand Down
Loading