Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest Manager] Agent unenroll #19507

Merged
merged 9 commits into from
Jul 7, 2020
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application

import (
"context"
"fmt"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
)

type handlerUnenroll struct {
log *logger.Logger
emitter emitterFunc
dispatcher programsDispatcher
}

func (h *handlerUnenroll) Handle(ctx context.Context, a action, acker fleetAcker) error {
h.log.Debugf("handlerUnenroll: action '%+v' received", a)
action, ok := a.(*fleetapi.ActionUnenroll)
if !ok {
return fmt.Errorf("invalid type, expected ActionUnenroll and received %T", a)
}

// Providing empty map will close all pipelines
noPrograms := make(map[routingKey][]program.Program)
h.dispatcher.Dispatch(a.ID(), noPrograms)

// TODO: clean action store

return acker.Ack(ctx, action)
}
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type emitterFunc func(*config.Config) error
// ConfigHandler is capable of handling config and perform actions at it.
type ConfigHandler interface {
HandleConfig(configrequest.Request) error
Close() error
}

type discoverFunc func() ([]string, error)
Expand Down
9 changes: 9 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,15 @@ func newManaged(
},
)

actionDispatcher.MustRegister(
&fleetapi.ActionUnenroll{},
&handlerUnenroll{
log: log,
emitter: emit,
dispatcher: router,
},
)

actionDispatcher.MustRegister(
&fleetapi.ActionUnknown{},
&handlerUnknown{log: log},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
)
Expand Down Expand Up @@ -68,16 +69,16 @@ func testActions() ([]action, error) {
}

type mockStreamStore struct {
store []*configRequest
store []configrequest.Request
}

func newMockStreamStore() *mockStreamStore {
return &mockStreamStore{
store: make([]*configRequest, 0),
store: make([]configrequest.Request, 0),
}
}

func (m *mockStreamStore) Execute(cr *configRequest) error {
func (m *mockStreamStore) Execute(cr configrequest.Request) error {
m.store = append(m.store, cr)
return nil
}
Expand Down
9 changes: 4 additions & 5 deletions x-pack/elastic-agent/pkg/agent/application/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ package application
import (
"fmt"
"strings"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/sorted"
Expand All @@ -19,7 +21,7 @@ var defautlRK = "DEFAULT"
type routingKey = string

type stream interface {
Execute(*configRequest) error
Execute(configrequest.Request) error
Close() error
}

Expand Down Expand Up @@ -72,10 +74,7 @@ func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) e
return fmt.Errorf("could not find programs for routing key %s", rk)
}

req := &configRequest{
id: id,
programs: programs.([]program.Program),
}
req := configrequest.New(id, time.Now(), programs.([]program.Program))

r.log.Debugf(
"Streams %s need to run config with ID %s and programs: %s",
Expand Down
3 changes: 2 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)
Expand Down Expand Up @@ -199,7 +200,7 @@ func newMockStream(rk routingKey, notify notifyFunc) *mockStream {
}
}

func (m *mockStream) Execute(req *configRequest) error {
func (m *mockStream) Execute(req configrequest.Request) error {
m.event(executeOp, req)
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package application
import (
"context"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation"
operatorCfg "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config"
Expand All @@ -28,10 +29,10 @@ type operatorStream struct {
}

func (b *operatorStream) Close() error {
return b.configHandler.HandleConfig(&configRequest{})
return b.configHandler.Close()
}

func (b *operatorStream) Execute(cfg *configRequest) error {
func (b *operatorStream) Execute(cfg configrequest.Request) error {
return b.configHandler.HandleConfig(cfg)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application
package configrequest

import (
"strings"
Expand All @@ -19,6 +19,15 @@ type configRequest struct {
programs []program.Program
}

// New created a new Request.
func New(id string, createdAt time.Time, programs []program.Program) Request {
return &configRequest{
id: id,
createdAt: createdAt,
programs: programs,
}
}

func (c *configRequest) String() string {
names := c.ProgramNames()
return "[" + c.ShortID() + "] Config: " + strings.Join(names, ", ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application
package configrequest

import (
"testing"
Expand Down
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/pkg/agent/configrequest/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
// Request is the minimal interface a config request must have.
type Request interface {
ID() string
ShortID() string
CreatedAt() time.Time
Programs() []program.Program
ProgramNames() []string
}
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) {
}

func (o *Operator) handleStopSidecar(s configrequest.Step) (result error) {
for _, step := range o.getMonitoringSteps(s) {
for _, step := range o.generateMonitoringSteps(s.Version, nil) {
p, _, err := getProgramFromStepWithTags(step, o.config.DownloadConfig, monitoringTags())
if err != nil {
return errors.New(err,
Expand Down
3 changes: 3 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ func (b *testMonitor) EnrichArgs(_ string, _ string, args []string, _ bool) []st
// Cleanup cleans up all drops.
func (b *testMonitor) Cleanup(string, string) error { return nil }

// Close closes the monitor.
func (b *testMonitor) Close() {}

// Prepare executes steps in order for monitoring to work correctly
func (b *testMonitor) Prepare(string, string, int, int) error { return nil }

Expand Down
7 changes: 7 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"strings"
"sync"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
Expand Down Expand Up @@ -126,6 +127,12 @@ func (o *Operator) State() map[string]state.State {
return result
}

// Close stops all programs handled by operator
func (o *Operator) Close() error {
o.monitor.Close()
return o.HandleConfig(configrequest.New("", time.Now(), nil))
}

// HandleConfig handles configuration for a pipeline and performs actions to achieve this configuration.
func (o *Operator) HandleConfig(cfg configrequest.Request) error {
_, steps, ack, err := o.stateResolver.Resolve(cfg)
Expand Down
12 changes: 12 additions & 0 deletions x-pack/elastic-agent/pkg/agent/stateresolver/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@ func (c *cfg) ID() string {
return c.id
}

func (c *cfg) ShortID() string {
return c.id
}

func (c *cfg) Programs() []program.Program {
return c.programs
}
Expand All @@ -354,6 +358,14 @@ func (c *cfg) CreatedAt() time.Time {
return c.createdAt
}

func (c *cfg) ProgramNames() []string {
names := make([]string, 0, len(c.programs))
for _, name := range c.programs {
names = append(names, name.Spec.Name)
}
return names
}

func p(identifier, checksum string) program.Program {
s, ok := program.FindSpecByName(identifier)
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ func (b *Monitor) Reload(rawConfig *config.Config) error {
return nil
}

// Close disables monitoring
func (b *Monitor) Close() {
b.config.Enabled = false
b.config.MonitorMetrics = false
b.config.MonitorLogs = false
}

// IsMonitoringEnabled returns true if monitoring is enabled.
func (b *Monitor) IsMonitoringEnabled() bool { return b.config.Enabled }

Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/core/monitoring/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Monitor interface {
IsMonitoringEnabled() bool
WatchLogs() bool
WatchMetrics() bool
Close()
}

type wrappedConfig struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ func (b *Monitor) Cleanup(string, string) error {
return nil
}

// Close closes the monitor.
func (b *Monitor) Close() {}

// Prepare executes steps in order for monitoring to work correctly
func (b *Monitor) Prepare(string, string, int, int) error {
return nil
Expand Down
30 changes: 30 additions & 0 deletions x-pack/elastic-agent/pkg/fleetapi/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,31 @@ func (a *ActionConfigChange) ID() string {
return a.ActionID
}

// ActionUnenroll is a request for agent to unhook from fleet.
type ActionUnenroll struct {
ActionID string
ActionType string
}

func (a *ActionUnenroll) String() string {
var s strings.Builder
s.WriteString("action_id: ")
s.WriteString(a.ActionID)
s.WriteString(", type: ")
s.WriteString(a.ActionType)
return s.String()
}

// Type returns the type of the Action.
func (a *ActionUnenroll) Type() string {
return a.ActionType
}

// ID returns the ID of the Action.
func (a *ActionUnenroll) ID() string {
return a.ActionID
}

// Actions is a list of Actions to executes and allow to unmarshal heterogenous action type.
type Actions []Action

Expand Down Expand Up @@ -117,6 +142,11 @@ func (a *Actions) UnmarshalJSON(data []byte) error {
"fail to decode CONFIG_CHANGE action",
errors.TypeConfig)
}
case "UNENROLL":
action = &ActionUnenroll{
ActionID: response.ActionID,
ActionType: response.ActionType,
}
default:
action = &ActionUnknown{
ActionID: response.ActionID,
Expand Down