-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[v2] Use the v2 components runtime as the core of the Elastic Agent #753
Changes from all commits
6b66b73
148b1dd
6f0d148
35845b2
a60ae17
a40830c
a00db54
a21aac2
03e10d9
08ba9cf
f30873b
6c1d9f6
e0f4f72
1191d10
c8f9c45
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,15 +15,15 @@ import ( | |
|
||
"gopkg.in/yaml.v2" | ||
|
||
"github.com/elastic/elastic-agent/internal/pkg/agent/application/actions" | ||
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" | ||
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info" | ||
"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline" | ||
"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline/actions" | ||
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration" | ||
"github.com/elastic/elastic-agent/internal/pkg/agent/errors" | ||
"github.com/elastic/elastic-agent/internal/pkg/agent/storage" | ||
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store" | ||
"github.com/elastic/elastic-agent/internal/pkg/config" | ||
"github.com/elastic/elastic-agent/internal/pkg/fleetapi" | ||
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker" | ||
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client" | ||
"github.com/elastic/elastic-agent/internal/pkg/remote" | ||
"github.com/elastic/elastic-agent/pkg/core/logger" | ||
|
@@ -36,28 +36,28 @@ const ( | |
// PolicyChange is a handler for POLICY_CHANGE action. | ||
type PolicyChange struct { | ||
log *logger.Logger | ||
emitter pipeline.EmitterFunc | ||
agentInfo *info.AgentInfo | ||
config *configuration.Configuration | ||
store storage.Store | ||
ch chan coordinator.ConfigChange | ||
setters []actions.ClientSetter | ||
} | ||
|
||
// NewPolicyChange creates a new PolicyChange handler. | ||
func NewPolicyChange( | ||
log *logger.Logger, | ||
emitter pipeline.EmitterFunc, | ||
agentInfo *info.AgentInfo, | ||
config *configuration.Configuration, | ||
store storage.Store, | ||
ch chan coordinator.ConfigChange, | ||
setters ...actions.ClientSetter, | ||
) *PolicyChange { | ||
return &PolicyChange{ | ||
log: log, | ||
emitter: emitter, | ||
agentInfo: agentInfo, | ||
config: config, | ||
store: store, | ||
ch: ch, | ||
setters: setters, | ||
} | ||
} | ||
|
@@ -72,7 +72,7 @@ func (h *PolicyChange) AddSetter(cs actions.ClientSetter) { | |
} | ||
|
||
// Handle handles policy change action. | ||
func (h *PolicyChange) Handle(ctx context.Context, a fleetapi.Action, acker store.FleetAcker) error { | ||
func (h *PolicyChange) Handle(ctx context.Context, a fleetapi.Action, acker acker.Acker) error { | ||
h.log.Debugf("handlerPolicyChange: action '%+v' received", a) | ||
action, ok := a.(*fleetapi.ActionPolicyChange) | ||
if !ok { | ||
|
@@ -89,11 +89,19 @@ func (h *PolicyChange) Handle(ctx context.Context, a fleetapi.Action, acker stor | |
if err != nil { | ||
return err | ||
} | ||
if err := h.emitter(ctx, c); err != nil { | ||
return err | ||
|
||
h.ch <- &policyChange{ | ||
ctx: ctx, | ||
cfg: c, | ||
action: a, | ||
acker: acker, | ||
} | ||
return nil | ||
} | ||
|
||
return acker.Ack(ctx, action) | ||
// Watch returns the channel for configuration change notifications. | ||
func (h *PolicyChange) Watch() <-chan coordinator.ConfigChange { | ||
return h.ch | ||
} | ||
|
||
func (h *PolicyChange) handleFleetServerHosts(ctx context.Context, c *config.Config) (err error) { | ||
|
@@ -210,3 +218,33 @@ func fleetToReader(agentInfo *info.AgentInfo, cfg *configuration.Configuration) | |
} | ||
return bytes.NewReader(data), nil | ||
} | ||
|
||
type policyChange struct { | ||
ctx context.Context | ||
cfg *config.Config | ||
action fleetapi.Action | ||
acker acker.Acker | ||
commit bool | ||
} | ||
|
||
func (l *policyChange) Config() *config.Config { | ||
return l.cfg | ||
} | ||
|
||
func (l *policyChange) Ack() error { | ||
if l.action == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some of these linter items looks like this issue #486 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I thought the lint there was very weird. Going to ignore it in the PR, but we should look into fixing it. |
||
return nil | ||
} | ||
err := l.acker.Ack(l.ctx, l.action) | ||
if err != nil { | ||
return err | ||
} | ||
if l.commit { | ||
return l.acker.Commit(l.ctx) | ||
} | ||
return nil | ||
} | ||
|
||
func (l *policyChange) Fail(_ error) { | ||
// do nothing | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the flatter namespaces, it will make it much easier to find in the future.