Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Use the v2 components runtime as the core of the Elastic Agent #753

Merged
merged 15 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ go_env.properties
mage_output_file.go
elastic_agent
fleet.yml
fleet.enc
fleet.enc.lock

# Editor swap files
*.swp
Expand Down
74 changes: 53 additions & 21 deletions control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,23 @@ package cproto;
option cc_enable_arenas = true;
option go_package = "internal/pkg/agent/control/cproto";

// Status codes for the current state.
enum Status {
// State codes for the current state.
enum State {
STARTING = 0;
CONFIGURING = 1;
HEALTHY = 2;
DEGRADED = 3;
FAILED = 4;
STOPPING = 5;
UPGRADING = 6;
ROLLBACK = 7;
STOPPED = 6;
UPGRADING = 7;
ROLLBACK = 8;
}

// Unit Type running inside a component.
enum UnitType {
INPUT = 0;
OUTPUT = 1;
}

// Action status codes for restart and upgrade response.
Expand Down Expand Up @@ -93,18 +100,43 @@ message UpgradeResponse {
string error = 3;
}

// Current status of the application in Elastic Agent.
message ApplicationStatus {
// Unique application ID.
message ComponentUnitState {
// Type of unit in the component.
UnitType unit_type = 1;
// ID of the unit in the component.
string unit_id = 2;
// Current state.
State state = 3;
// Current state message.
string message = 4;
// Current state payload.
string payload = 5;
}

// Version information reported by the component to Elastic Agent.
message ComponentVersionInfo {
// Name of the component.
string name = 1;
// Version of the component.
string version = 2;
// Extra meta information about the version.
map<string, string> meta = 3;
}

// Current state of a running component by Elastic Agent.
message ComponentState {
// Unique component ID.
string id = 1;
// Application name.
// Component name.
string name = 2;
// Current status.
Status status = 3;
// Current status message.
// Current state.
State state = 3;
// Current state message.
string message = 4;
// Current status payload.
string payload = 5;
// Current units running in the component.
repeated ComponentUnitState units = 5;
// Current version information for the running component.
ComponentVersionInfo version_info = 6;
}

// Current metadata for a running process.
Expand All @@ -126,14 +158,14 @@ message ProcMeta {
string error = 15;
}

// Status is the current status of Elastic Agent.
message StatusResponse {
// Overall status of Elastic Agent.
Status status = 1;
// StateResponse is the current state of Elastic Agent.
message StateResponse {
// Overall state of Elastic Agent.
State state = 1;
// Overall status message of Elastic Agent.
string message = 2;
// Status of each application in Elastic Agent.
repeated ApplicationStatus applications = 3;
// Status of each component in Elastic Agent.
repeated ComponentState components = 3;
}

// ProcMetaResponse is the current running version infomation for all processes.
Expand Down Expand Up @@ -184,8 +216,8 @@ service ElasticAgentControl {
// Fetches the currently running version of the Elastic Agent.
rpc Version(Empty) returns (VersionResponse);

// Fetches the currently status of the Elastic Agent.
rpc Status(Empty) returns (StatusResponse);
// Fetches the currently states of the Elastic Agent.
rpc State(Empty) returns (StateResponse);

// Restart restarts the current running Elastic Agent.
rpc Restart(Empty) returns (RestartResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ package actions
import (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the flatter namespaces, it will make it much easier to find in the future.

"context"

"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
)

// Handler handles action coming from fleet.
type Handler interface {
Handle(ctx context.Context, a fleetapi.Action, acker store.FleetAcker) error
Handle(ctx context.Context, a fleetapi.Action, acker acker.Acker) error
}

// ClientSetter sets the client for communication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ import (
"fmt"
"time"

"github.com/elastic/elastic-agent-client/v7/pkg/client"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/pkg/core/server"
)

const (
Expand All @@ -25,27 +28,28 @@ var errActionTimeoutInvalid = errors.New("action timeout is invalid")

// AppAction is a handler for application actions.
type AppAction struct {
log *logger.Logger
srv *server.Server
log *logger.Logger
coord *coordinator.Coordinator
}

// NewAppAction creates a new AppAction handler.
func NewAppAction(log *logger.Logger, srv *server.Server) *AppAction {
func NewAppAction(log *logger.Logger, coord *coordinator.Coordinator) *AppAction {
return &AppAction{
log: log,
srv: srv,
log: log,
coord: coord,
}
}

// Handle handles application action.
func (h *AppAction) Handle(ctx context.Context, a fleetapi.Action, acker store.FleetAcker) error {
func (h *AppAction) Handle(ctx context.Context, a fleetapi.Action, acker acker.Acker) error {
h.log.Debugf("handlerAppAction: action '%+v' received", a)
action, ok := a.(*fleetapi.ActionApp)
if !ok {
return fmt.Errorf("invalid type, expected ActionApp and received %T", a)
}

appState, ok := h.srv.FindByInputType(action.InputType)
state := h.coord.State()
unit, ok := findUnitFromInputType(state, action.InputType)
if !ok {
// If the matching action is not found ack the action with the error for action result document
action.StartedAt = time.Now().UTC().Format(time.RFC3339Nano)
Expand All @@ -71,8 +75,10 @@ func (h *AppAction) Handle(ctx context.Context, a fleetapi.Action, acker store.F

var res map[string]interface{}
if err == nil {
h.log.Debugf("handlerAppAction: action '%v' started with timeout: %v", action.InputType, timeout)
res, err = appState.PerformAction(action.InputType, params, timeout)
h.log.Debugf("handlerAppAction: action '%v' started with timeout: %v", action.ActionType, timeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
res, err = h.coord.PerformAction(ctx, unit, action.ActionType, params)
}
end := time.Now().UTC()

Expand Down Expand Up @@ -143,3 +149,17 @@ func readMapString(m map[string]interface{}, key string, def string) string {
}
return def
}

func findUnitFromInputType(state coordinator.State, inputType string) (component.Unit, bool) {
for _, comp := range state.Components {
for _, unit := range comp.Component.Units {
if unit.Type == client.UnitTypeInput {
it, ok := unit.Config["type"]
if ok && it == inputType {
return unit, true
}
}
}
}
return component.Unit{}, false
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"context"
"fmt"

"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

Expand All @@ -32,7 +32,7 @@ func NewCancel(log *logger.Logger, cancel queueCanceler) *Cancel {
}

// Handle will cancel any actions in the queue that match target_id.
func (h *Cancel) Handle(ctx context.Context, a fleetapi.Action, acker store.FleetAcker) error {
func (h *Cancel) Handle(ctx context.Context, a fleetapi.Action, acker acker.Acker) error {
action, ok := a.(*fleetapi.ActionCancel)
if !ok {
return fmt.Errorf("invalid type, expected ActionCancel and received %T", a)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (

"gopkg.in/yaml.v2"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/actions"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline/actions"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
"github.com/elastic/elastic-agent/internal/pkg/remote"
"github.com/elastic/elastic-agent/pkg/core/logger"
Expand All @@ -36,28 +36,28 @@ const (
// PolicyChange is a handler for POLICY_CHANGE action.
type PolicyChange struct {
log *logger.Logger
emitter pipeline.EmitterFunc
agentInfo *info.AgentInfo
config *configuration.Configuration
store storage.Store
ch chan coordinator.ConfigChange
setters []actions.ClientSetter
}

// NewPolicyChange creates a new PolicyChange handler.
func NewPolicyChange(
log *logger.Logger,
emitter pipeline.EmitterFunc,
agentInfo *info.AgentInfo,
config *configuration.Configuration,
store storage.Store,
ch chan coordinator.ConfigChange,
setters ...actions.ClientSetter,
) *PolicyChange {
return &PolicyChange{
log: log,
emitter: emitter,
agentInfo: agentInfo,
config: config,
store: store,
ch: ch,
setters: setters,
}
}
Expand All @@ -72,7 +72,7 @@ func (h *PolicyChange) AddSetter(cs actions.ClientSetter) {
}

// Handle handles policy change action.
func (h *PolicyChange) Handle(ctx context.Context, a fleetapi.Action, acker store.FleetAcker) error {
func (h *PolicyChange) Handle(ctx context.Context, a fleetapi.Action, acker acker.Acker) error {
h.log.Debugf("handlerPolicyChange: action '%+v' received", a)
action, ok := a.(*fleetapi.ActionPolicyChange)
if !ok {
Expand All @@ -89,11 +89,19 @@ func (h *PolicyChange) Handle(ctx context.Context, a fleetapi.Action, acker stor
if err != nil {
return err
}
if err := h.emitter(ctx, c); err != nil {
return err

h.ch <- &policyChange{
ctx: ctx,
cfg: c,
action: a,
acker: acker,
}
return nil
}

return acker.Ack(ctx, action)
// Watch returns the channel for configuration change notifications.
func (h *PolicyChange) Watch() <-chan coordinator.ConfigChange {
return h.ch
}

func (h *PolicyChange) handleFleetServerHosts(ctx context.Context, c *config.Config) (err error) {
Expand Down Expand Up @@ -210,3 +218,33 @@ func fleetToReader(agentInfo *info.AgentInfo, cfg *configuration.Configuration)
}
return bytes.NewReader(data), nil
}

type policyChange struct {
ctx context.Context
cfg *config.Config
action fleetapi.Action
acker acker.Acker
commit bool
}

func (l *policyChange) Config() *config.Config {
return l.cfg
}

func (l *policyChange) Ack() error {
if l.action == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of these linter items looks like this issue #486

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I thought the lint there was very weird. Going to ignore it in the PR, but we should look into fixing it.

return nil
}
err := l.acker.Ack(l.ctx, l.action)
if err != nil {
return err
}
if l.commit {
return l.acker.Commit(l.ctx)
}
return nil
}

func (l *policyChange) Fail(_ error) {
// do nothing
}
Loading