From 1912dd17ce397b6b35b0af08c7050fdd0b1b557b Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 25 Aug 2022 14:13:58 -0400 Subject: [PATCH 1/3] Fix bootstrapping a Fleet Server with v2. --- internal/pkg/agent/application/application.go | 31 ++-- .../application/coordinator/coordinator.go | 4 +- .../application/fleet_server_bootstrap.go | 164 +++++++++++++++++- .../fleet_server_bootstrap_test.go | 57 ++++++ .../pkg/agent/application/managed_mode.go | 54 ++++-- internal/pkg/agent/cmd/run.go | 44 ++--- internal/pkg/testutils/testutils.go | 17 ++ pkg/component/component_test.go | 3 +- pkg/component/runtime/manager.go | 33 ++-- pkg/component/runtime/runtime.go | 2 +- pkg/component/runtime/runtime_comm.go | 15 +- pkg/component/runtime/state.go | 14 +- specs/fleet-server.spec.yml | 36 ++-- 13 files changed, 374 insertions(+), 100 deletions(-) create mode 100644 internal/pkg/agent/application/fleet_server_bootstrap_test.go diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index 6b3c4b73d42..d4ed9489961 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -65,7 +65,7 @@ func New( upgrader := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, agentInfo) - runtime, err := runtime.NewManager(log, cfg.Settings.GRPC.String(), tracer) + runtime, err := runtime.NewManager(log, cfg.Settings.GRPC.String(), agentInfo, tracer) if err != nil { return nil, fmt.Errorf("failed to initialize runtime manager: %w", err) } @@ -85,13 +85,6 @@ func New( log.Debugf("Reloading of configuration is on, frequency is set to %s", cfg.Settings.Reload.Period) configMgr = newPeriodic(log, cfg.Settings.Reload.Period, discover, loader) } - } else if configuration.IsFleetServerBootstrap(cfg.Fleet) { - log.Info("Parsed configuration and determined agent is in Fleet Server bootstrap mode") - compModifiers = append(compModifiers, FleetServerComponentModifier) - configMgr, err = newFleetServerBootstrapManager(log) - if err != nil { - return nil, err - } } else { var store storage.Store store, cfg, err = mergeFleetConfig(rawConfig) @@ -99,14 +92,24 @@ func New( return nil, err } - log.Info("Parsed configuration and determined agent is managed by Fleet") + if configuration.IsFleetServerBootstrap(cfg.Fleet) { + log.Info("Parsed configuration and determined agent is in Fleet Server bootstrap mode") - compModifiers = append(compModifiers, FleetServerComponentModifier) - managed, err = newManagedConfigManager(log, agentInfo, cfg, store, runtime) - if err != nil { - return nil, err + compModifiers = append(compModifiers, FleetServerComponentModifier(cfg.Fleet.Server)) + configMgr, err = newFleetServerBootstrapManager(log) + if err != nil { + return nil, err + } + } else { + log.Info("Parsed configuration and determined agent is managed by Fleet") + + compModifiers = append(compModifiers, FleetServerComponentModifier(cfg.Fleet.Server)) + managed, err = newManagedConfigManager(log, agentInfo, cfg, store, runtime) + if err != nil { + return nil, err + } + configMgr = managed } - configMgr = managed } composable, err := composable.New(log, rawConfig) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 906b9af2d64..30fcdfcce81 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -113,7 +113,7 @@ type VarsManager interface { // ComponentsModifier is a function that takes the computed components model and modifies it before // passing it into the components runtime manager. -type ComponentsModifier func(comps []component.Component, policy map[string]interface{}) ([]component.Component, error) +type ComponentsModifier func(comps []component.Component) ([]component.Component, error) // State provides the current state of the coordinator along with all the current states of components and units. type State struct { @@ -492,7 +492,7 @@ func (c *Coordinator) process(ctx context.Context) (err error) { } for _, modifier := range c.modifiers { - comps, err = modifier(comps, cfg) + comps, err = modifier(comps) if err != nil { return fmt.Errorf("failed to modify components: %w", err) } diff --git a/internal/pkg/agent/application/fleet_server_bootstrap.go b/internal/pkg/agent/application/fleet_server_bootstrap.go index bfb801b9dde..3990cebb3f4 100644 --- a/internal/pkg/agent/application/fleet_server_bootstrap.go +++ b/internal/pkg/agent/application/fleet_server_bootstrap.go @@ -6,35 +6,80 @@ package application import ( "context" - "time" + "fmt" + "gopkg.in/yaml.v2" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" + "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/core/logger" ) +const ( + elasticsearch = "elasticsearch" + fleetServer = "fleet-server" +) + // injectFleetServerInput is the base configuration that is used plus the FleetServerComponentModifier that adjusts // the components before sending them to the runtime manager. var injectFleetServerInput = config.MustNewConfigFrom(map[string]interface{}{ "outputs": map[string]interface{}{ "default": map[string]interface{}{ - "type": "elasticsearch", + "type": elasticsearch, "hosts": []string{"localhost:9200"}, }, }, "inputs": []interface{}{ map[string]interface{}{ - "type": "fleet-server", + "id": fleetServer, + "type": fleetServer, }, }, }) // FleetServerComponentModifier modifies the comps to inject extra information from the policy into // the Fleet Server component and units needed to run Fleet Server correctly. -func FleetServerComponentModifier(comps []component.Component, policy map[string]interface{}) ([]component.Component, error) { - // TODO(blakerouse): Need to add logic to update the Fleet Server component with extra information from the policy. - return comps, nil +func FleetServerComponentModifier(serverCfg *configuration.FleetServerConfig) coordinator.ComponentsModifier { + return func(comps []component.Component) ([]component.Component, error) { + for i, comp := range comps { + if comp.Spec.InputType == fleetServer { + for j, unit := range comp.Units { + if unit.Type == client.UnitTypeOutput && unit.Config.Type == elasticsearch { + unitCfgMap, err := toMapStr(unit.Config.Source.AsMap(), &serverCfg.Output.Elasticsearch) + if err != nil { + return nil, err + } + fixOutputMap(unitCfgMap) + unitCfg, err := component.ExpectedConfig(unitCfgMap) + if err != nil { + return nil, err + } + unit.Config = unitCfg + } else if unit.Type == client.UnitTypeInput && unit.Config.Type == fleetServer { + unitCfgMap, err := toMapStr(unit.Config.Source.AsMap(), &inputFleetServer{ + Policy: serverCfg.Policy, + Server: serverCfg, + }) + if err != nil { + return nil, err + } + fixInputMap(unitCfgMap) + unitCfg, err := component.ExpectedConfig(unitCfgMap) + if err != nil { + return nil, err + } + unit.Config = unitCfg + } + comp.Units[j] = unit + } + } + comps[i] = comp + } + return comps, nil + } } type fleetServerBootstrapManager struct { @@ -55,9 +100,6 @@ func newFleetServerBootstrapManager( } func (m *fleetServerBootstrapManager) Run(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, 15*time.Second) - defer cancel() - m.log.Debugf("injecting fleet-server for bootstrap") select { case <-ctx.Done(): @@ -76,3 +118,107 @@ func (m *fleetServerBootstrapManager) Errors() <-chan error { func (m *fleetServerBootstrapManager) Watch() <-chan coordinator.ConfigChange { return m.ch } + +func fixOutputMap(m map[string]interface{}) { + // api_key cannot be present or Fleet Server will complain + delete(m, "api_key") +} + +type inputFleetServer struct { + Policy *configuration.FleetServerPolicyConfig `yaml:"policy,omitempty"` + Server *configuration.FleetServerConfig `yaml:"server"` +} + +func fixInputMap(m map[string]interface{}) { + if srv, ok := m["server"]; ok { + if srvMap, ok := srv.(map[string]interface{}); ok { + // bootstrap is internal to Elastic Agent + delete(srvMap, "bootstrap") + // policy is present one level input when sent to Fleet Server + delete(srvMap, "policy") + // output is present in the output unit + delete(srvMap, "output") + } + } +} + +// toMapStr converts the input into a map[string]interface{}. +// +// This is done by using YAMl to marshal and then unmarshal it into the map[string]interface{}. YAML tags on the struct +// match the loading and unloading of the configuration so this ensures that it will match what Fleet Server is +// expecting. +func toMapStr(input ...interface{}) (map[string]interface{}, error) { + m := map[interface{}]interface{}{} + for _, i := range input { + im, err := toMapInterface(i) + if err != nil { + return nil, err + } + m = mergeNestedMaps(m, im) + } + // toMapInterface will set nested maps to a map[interface{}]interface{} which `component.ExpectedConfig` cannot + // handle they must be a map[string]interface{}. + fm := fixYamlMap(m) + r, ok := fm.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("expected map[string]interface{}, got %T", fm) + } + return r, nil +} + +// toMapInterface converts the input into a map[interface{}]interface{} using YAML marshall and unmarshall. +func toMapInterface(input interface{}) (map[interface{}]interface{}, error) { + var res map[interface{}]interface{} + raw, err := yaml.Marshal(input) + if err != nil { + return nil, err + } + err = yaml.Unmarshal(raw, &res) + if err != nil { + return nil, err + } + return res, nil +} + +// mergeNestedMaps merges two map[interface{}]interface{} together deeply. +func mergeNestedMaps(a, b map[interface{}]interface{}) map[interface{}]interface{} { + res := make(map[interface{}]interface{}, len(a)) + for k, v := range a { + res[k] = v + } + for k, v := range b { + if v, ok := v.(map[interface{}]interface{}); ok { + if bv, ok := res[k]; ok { + if bv, ok := bv.(map[interface{}]interface{}); ok { + res[k] = mergeNestedMaps(bv, v) + continue + } + } + } + res[k] = v + } + return res +} + +// fixYamlMap converts map[interface{}]interface{} into map[string]interface{} through out the entire map. +func fixYamlMap(input interface{}) interface{} { + switch i := input.(type) { + case map[string]interface{}: + for k, v := range i { + i[k] = fixYamlMap(v) + } + case map[interface{}]interface{}: + m := map[string]interface{}{} + for k, v := range i { + if ks, ok := k.(string); ok { + m[ks] = fixYamlMap(v) + } + } + return m + case []interface{}: + for j, v := range i { + i[j] = fixYamlMap(v) + } + } + return input +} diff --git a/internal/pkg/agent/application/fleet_server_bootstrap_test.go b/internal/pkg/agent/application/fleet_server_bootstrap_test.go new file mode 100644 index 00000000000..931753e9e90 --- /dev/null +++ b/internal/pkg/agent/application/fleet_server_bootstrap_test.go @@ -0,0 +1,57 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" + "github.com/elastic/elastic-agent/internal/pkg/testutils" +) + +func TestFleetServerBootstrapManager(t *testing.T) { + l := testutils.NewErrorLogger(t) + mgr, err := newFleetServerBootstrapManager(l) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + g, _ := errgroup.WithContext(ctx) + + var change coordinator.ConfigChange + g.Go(func() error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-mgr.Errors(): + cancel() + return err + case change = <-mgr.Watch(): + cancel() + } + } + }) + + g.Go(func() error { + return mgr.Run(ctx) + }) + + err = g.Wait() + if err != nil && !errors.Is(err, context.Canceled) { + require.NoError(t, err) + } + + require.NotNil(t, change) + assert.NotNil(t, change.Config()) +} diff --git a/internal/pkg/agent/application/managed_mode.go b/internal/pkg/agent/application/managed_mode.go index 893b7541606..8abeab60eba 100644 --- a/internal/pkg/agent/application/managed_mode.go +++ b/internal/pkg/agent/application/managed_mode.go @@ -146,12 +146,19 @@ func (m *managedConfigManager) Run(ctx context.Context) error { stateRestored = true } - // In the case this is the first start and this Elastic Agent is running a Fleet Server; we need to ensure that + // In the case this Elastic Agent is running a Fleet Server; we need to ensure that // the Fleet Server is running before the Fleet gateway is started. - if !stateRestored && m.cfg.Fleet.Server != nil { - err = m.initFleetServer(ctx) - if err != nil { - return fmt.Errorf("failed to initialize Fleet Server: %w", err) + if m.cfg.Fleet.Server != nil { + if stateRestored { + err = m.waitForFleetServer(ctx) + if err != nil { + return fmt.Errorf("failed to initialize Fleet Server: %w", err) + } + } else { + err = m.initFleetServer(ctx) + if err != nil { + return fmt.Errorf("failed to initialize Fleet Server: %w", err) + } } } @@ -233,31 +240,42 @@ func (m *managedConfigManager) initFleetServer(ctx context.Context) error { case m.ch <- &localConfigChange{injectFleetServerInput}: } - m.log.Debugf("watching fleet-server-default component state") - sub := m.runtime.Subscribe(ctx, "fleet-server-default") + return m.waitForFleetServer(ctx) +} + +func (m *managedConfigManager) waitForFleetServer(ctx context.Context) error { + m.log.Debugf("watching Fleet Server component state") + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + sub := m.runtime.SubscribeAll(ctx) for { select { case <-ctx.Done(): return ctx.Err() - case state := <-sub.Ch(): - if fleetServerRunning(state) { - m.log.With("state", state).Debugf("fleet-server-default component is running") - return nil + case compState := <-sub.Ch(): + if compState.Component.Spec.InputType == "fleet-server" { + if fleetServerRunning(compState.State) { + m.log.With("state", compState.State).Debugf("Fleet Server is running") + return nil + } + m.log.With("state", compState.State).Debugf("Fleet Server is not running") } - m.log.With("state", state).Debugf("fleet-server-default component is not running") } } } func fleetServerRunning(state runtime.ComponentState) bool { - if state.State == client.UnitStateHealthy || state.State == client.UnitStateDegraded { - for key, unit := range state.Units { - if key.UnitType == client.UnitTypeInput && key.UnitID == "fleet-server-default-fleet-server" { - if unit.State == client.UnitStateHealthy || unit.State == client.UnitStateDegraded { - return true - } + if state.State == client.UnitStateHealthy { + if len(state.Units) == 0 { + return false + } + for _, unit := range state.Units { + if unit.State != client.UnitStateHealthy { + return false } } + return true } return false } diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index ef04422cff9..1867325f7a6 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -50,7 +50,7 @@ func newRunCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Command { Use: "run", Short: "Start the elastic-agent.", Run: func(_ *cobra.Command, _ []string) { - if err := run(nil); err != nil { + if err := run(nil); err != nil && !errors.Is(err, context.Canceled) { fmt.Fprintf(streams.Err, "Error: %v\n%s\n", err, troubleshootMessage()) os.Exit(1) } @@ -181,53 +181,53 @@ func run(override cfgOverrider, modifiers ...component.PlatformModifier) error { */ appDone := make(chan bool) - appErrCh := make(chan error) - ctx, cancel = context.WithCancel(context.Background()) - defer cancel() + appErr := make(chan error) go func() { err := app.Run(ctx) close(appDone) - appErrCh <- err + appErr <- err }() // listen for signals signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP) - reexecing := false + isRex := false + logShutdown := true +LOOP: for { - breakout := false select { case <-stop: - breakout = true + break LOOP case <-appDone: - breakout = true + logShutdown = false + break LOOP case <-rex.ShutdownChan(): - reexecing = true - breakout = true + isRex = true + logShutdown = false + break LOOP case sig := <-signals: if sig == syscall.SIGHUP { rexLogger.Infof("SIGHUP triggered re-exec") + isRex = true rex.ReExec(nil) } else { - breakout = true + break LOOP } } - if breakout { - if !reexecing { - logger.Info("Shutting down Elastic Agent and sending last events...") - } - break - } } + if logShutdown { + logger.Info("Shutting down Elastic Agent and sending last events...") + } cancel() - err = <-appErrCh + err = <-appErr - if !reexecing { + if logShutdown { logger.Info("Shutting down completed.") - return err } - rex.ShutdownComplete() + if isRex { + rex.ShutdownComplete() + } return err } diff --git a/internal/pkg/testutils/testutils.go b/internal/pkg/testutils/testutils.go index e1cbf7d34ed..fcd7cbbe2b6 100644 --- a/internal/pkg/testutils/testutils.go +++ b/internal/pkg/testutils/testutils.go @@ -8,6 +8,11 @@ import ( "runtime" "testing" + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent/pkg/core/logger" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/secret" "github.com/elastic/elastic-agent/internal/pkg/agent/storage" ) @@ -23,3 +28,15 @@ func InitStorage(t *testing.T) { } } } + +// NewErrorLogger creates an error logger for testing. +func NewErrorLogger(t *testing.T) *logger.Logger { + t.Helper() + + loggerCfg := logger.DefaultLoggingConfig() + loggerCfg.Level = logp.ErrorLevel + + log, err := logger.NewFromConfig("", loggerCfg, false) + require.NoError(t, err) + return log +} diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index 03d18172593..06fc30c56c0 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -8,11 +8,12 @@ package component import ( "errors" - "github.com/elastic/elastic-agent-client/v7/pkg/proto" "path/filepath" "sort" "testing" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/pkg/component/runtime/manager.go b/pkg/component/runtime/manager.go index 8fbeeb73ff7..573bb1653da 100644 --- a/pkg/component/runtime/manager.go +++ b/pkg/component/runtime/manager.go @@ -16,6 +16,8 @@ import ( "sync" "time" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" + "github.com/gofrs/uuid" "go.elastic.co/apm" @@ -61,6 +63,7 @@ type Manager struct { logger *logger.Logger ca *authority.CertificateAuthority listenAddr string + agentInfo *info.AgentInfo tracer *apm.Tracer netMx sync.RWMutex @@ -85,7 +88,7 @@ type Manager struct { } // NewManager creates a new manager. -func NewManager(logger *logger.Logger, listenAddr string, tracer *apm.Tracer) (*Manager, error) { +func NewManager(logger *logger.Logger, listenAddr string, agentInfo *info.AgentInfo, tracer *apm.Tracer) (*Manager, error) { ca, err := authority.NewCA() if err != nil { return nil, err @@ -94,6 +97,7 @@ func NewManager(logger *logger.Logger, listenAddr string, tracer *apm.Tracer) (* logger: logger, ca: ca, listenAddr: listenAddr, + agentInfo: agentInfo, tracer: tracer, waitReady: make(map[string]waitForReady), current: make(map[string]*componentRuntimeState), @@ -143,23 +147,32 @@ func (m *Manager) Run(ctx context.Context) error { m.shuttingDown.Store(false) // start serving GRPC connections - errCh := make(chan error) + var wg sync.WaitGroup + wg.Add(1) go func() { - errCh <- server.Serve(lis) + defer wg.Done() + for { + err := server.Serve(lis) + if err != nil { + m.logger.Errorf("control protocol failed: %w", err) + } + if ctx.Err() != nil { + // context has an error don't start again + return + } + } }() - select { - case <-ctx.Done(): - server.Stop() - err = <-errCh - case err = <-errCh: - } + <-ctx.Done() m.shutdown() + + server.Stop() + wg.Wait() m.netMx.Lock() m.listener = nil m.server = nil m.netMx.Unlock() - return err + return ctx.Err() } // WaitForReady waits until the manager is ready to be used. diff --git a/pkg/component/runtime/runtime.go b/pkg/component/runtime/runtime.go index e2c9a2bd013..aae913efac4 100644 --- a/pkg/component/runtime/runtime.go +++ b/pkg/component/runtime/runtime.go @@ -88,7 +88,7 @@ type componentRuntimeState struct { } func newComponentRuntimeState(m *Manager, logger *logger.Logger, comp component.Component) (*componentRuntimeState, error) { - comm, err := newRuntimeComm(logger, m.getListenAddr(), m.ca) + comm, err := newRuntimeComm(logger, m.getListenAddr(), m.ca, m.agentInfo) if err != nil { return nil, err } diff --git a/pkg/component/runtime/runtime_comm.go b/pkg/component/runtime/runtime_comm.go index 622b514c230..4e9b4c23598 100644 --- a/pkg/component/runtime/runtime_comm.go +++ b/pkg/component/runtime/runtime_comm.go @@ -11,6 +11,8 @@ import ( "strings" "sync" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" + protobuf "google.golang.org/protobuf/proto" "github.com/elastic/elastic-agent-client/v7/pkg/client" @@ -40,6 +42,7 @@ type runtimeComm struct { logger *logger.Logger listenAddr string ca *authority.CertificateAuthority + agentInfo *info.AgentInfo name string token string @@ -58,7 +61,7 @@ type runtimeComm struct { actionsResponse chan *proto.ActionResponse } -func newRuntimeComm(logger *logger.Logger, listenAddr string, ca *authority.CertificateAuthority) (*runtimeComm, error) { +func newRuntimeComm(logger *logger.Logger, listenAddr string, ca *authority.CertificateAuthority, agentInfo *info.AgentInfo) (*runtimeComm, error) { token, err := uuid.NewV4() if err != nil { return nil, err @@ -75,6 +78,7 @@ func newRuntimeComm(logger *logger.Logger, listenAddr string, ca *authority.Cert logger: logger, listenAddr: listenAddr, ca: ca, + agentInfo: agentInfo, name: name, token: token.String(), cert: pair, @@ -123,6 +127,15 @@ func (c *runtimeComm) WriteConnInfo(w io.Writer, services ...client.Service) err } func (c *runtimeComm) CheckinExpected(expected *proto.CheckinExpected) { + if c.agentInfo != nil && c.agentInfo.AgentID() != "" { + expected.AgentInfo = &proto.CheckinAgentInfo{ + Id: c.agentInfo.AgentID(), + Version: c.agentInfo.Version(), + Snapshot: c.agentInfo.Snapshot(), + } + } else { + expected.AgentInfo = nil + } c.checkinExpected <- expected } diff --git a/pkg/component/runtime/state.go b/pkg/component/runtime/state.go index b8278d334c7..2bd848cfce0 100644 --- a/pkg/component/runtime/state.go +++ b/pkg/component/runtime/state.go @@ -5,7 +5,6 @@ package runtime import ( - "fmt" "reflect" "github.com/elastic/elastic-agent-client/v7/pkg/client" @@ -227,9 +226,6 @@ func (s *ComponentState) syncCheckin(checkin *proto.CheckinObserved) bool { _, inExpected := s.expectedUnits[key] existing, _ := s.Units[key] existing.unitState = client.UnitState(unit.State) - if existing.unitState == client.UnitStateStopped { - fmt.Printf("stopped") - } existing.unitMessage = unit.Message existing.unitPayload = payload existing.configStateIdx = unit.ConfigStateIdx @@ -273,7 +269,7 @@ func (s *ComponentState) syncCheckin(checkin *proto.CheckinObserved) bool { unit.Message = errMsg unit.Payload = nil } - } else if unit.State != client.UnitStateStarting { + } else if unit.State != client.UnitStateStarting && unit.State != client.UnitStateStopped { if unit.State != client.UnitStateFailed || unit.Message != missingMsg || diffPayload(unit.Payload, nil) { changed = true unit.State = client.UnitStateFailed @@ -364,6 +360,14 @@ func (s *ComponentState) cleanupStopped() bool { } } } + for k, u := range s.Units { + _, ok := s.expectedUnits[k] + if !ok && u.State == client.UnitStateStopped { + // stopped unit that is not expected (remove it) + delete(s.Units, k) + cleaned = true + } + } return cleaned } diff --git a/specs/fleet-server.spec.yml b/specs/fleet-server.spec.yml index e0bf9c996ff..f1e760efe8b 100644 --- a/specs/fleet-server.spec.yml +++ b/specs/fleet-server.spec.yml @@ -1,17 +1,19 @@ -version: 2 -inputs: - - name: fleet-server - description: "Fleet Server" - platforms: - - linux/amd64 - - linux/arm64 - - darwin/amd64 - - darwin/arm64 - - windows/amd64 - - container/amd64 - - container/arm64 - outputs: - - elasticsearch - command: - args: - - "--agent-mode" +version: 2 +inputs: + - name: fleet-server + description: "Fleet Server" + platforms: + - linux/amd64 + - linux/arm64 + - darwin/amd64 + - darwin/arm64 + - windows/amd64 + - container/amd64 + - container/arm64 + outputs: + - elasticsearch + command: + args: + - "--agent-mode" + - "-E" + - "logging.to_stderr=true" From c12dfa301a68f105a2f24ce1dddddae9a76b1fb8 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 25 Aug 2022 14:25:56 -0400 Subject: [PATCH 2/3] Fix lint. --- internal/pkg/agent/application/application.go | 5 +---- internal/pkg/agent/application/fleet_server_bootstrap.go | 4 ++-- .../pkg/agent/application/fleet_server_bootstrap_test.go | 5 ++--- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index d4ed9489961..c5076535825 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -96,10 +96,7 @@ func New( log.Info("Parsed configuration and determined agent is in Fleet Server bootstrap mode") compModifiers = append(compModifiers, FleetServerComponentModifier(cfg.Fleet.Server)) - configMgr, err = newFleetServerBootstrapManager(log) - if err != nil { - return nil, err - } + configMgr = newFleetServerBootstrapManager(log) } else { log.Info("Parsed configuration and determined agent is managed by Fleet") diff --git a/internal/pkg/agent/application/fleet_server_bootstrap.go b/internal/pkg/agent/application/fleet_server_bootstrap.go index 3990cebb3f4..369c63bf53b 100644 --- a/internal/pkg/agent/application/fleet_server_bootstrap.go +++ b/internal/pkg/agent/application/fleet_server_bootstrap.go @@ -91,12 +91,12 @@ type fleetServerBootstrapManager struct { func newFleetServerBootstrapManager( log *logger.Logger, -) (*fleetServerBootstrapManager, error) { +) *fleetServerBootstrapManager { return &fleetServerBootstrapManager{ log: log, ch: make(chan coordinator.ConfigChange), errCh: make(chan error), - }, nil + } } func (m *fleetServerBootstrapManager) Run(ctx context.Context) error { diff --git a/internal/pkg/agent/application/fleet_server_bootstrap_test.go b/internal/pkg/agent/application/fleet_server_bootstrap_test.go index 931753e9e90..53fd864fdb6 100644 --- a/internal/pkg/agent/application/fleet_server_bootstrap_test.go +++ b/internal/pkg/agent/application/fleet_server_bootstrap_test.go @@ -20,8 +20,7 @@ import ( func TestFleetServerBootstrapManager(t *testing.T) { l := testutils.NewErrorLogger(t) - mgr, err := newFleetServerBootstrapManager(l) - require.NoError(t, err) + mgr := newFleetServerBootstrapManager(l) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -47,7 +46,7 @@ func TestFleetServerBootstrapManager(t *testing.T) { return mgr.Run(ctx) }) - err = g.Wait() + err := g.Wait() if err != nil && !errors.Is(err, context.Canceled) { require.NoError(t, err) } From f18d5289c6d0258d2165e8955b4b26308deed851 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 25 Aug 2022 14:44:32 -0400 Subject: [PATCH 3/3] Fix tests. --- pkg/component/runtime/manager_test.go | 110 ++++++++++++++++++++------ 1 file changed, 86 insertions(+), 24 deletions(-) diff --git a/pkg/component/runtime/manager_test.go b/pkg/component/runtime/manager_test.go index a28566c43eb..247f54ccd6e 100644 --- a/pkg/component/runtime/manager_test.go +++ b/pkg/component/runtime/manager_test.go @@ -15,6 +15,8 @@ import ( "testing" "time" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" + "go.elastic.co/apm/apmtest" "github.com/elastic/elastic-agent-libs/logp" @@ -47,11 +49,16 @@ func TestManager_SimpleComponentErr(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := NewManager(newErrorLogger(t), "localhost:0", apmtest.DiscardTracer) + ai, _ := info.NewAgentInfo(true) + m, err := NewManager(newErrorLogger(t), "localhost:0", ai, apmtest.DiscardTracer) require.NoError(t, err) errCh := make(chan error) go func() { - errCh <- m.Run(ctx) + err := m.Run(ctx) + if errors.Is(err, context.Canceled) { + err = nil + } + errCh <- err }() waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) @@ -142,11 +149,16 @@ func TestManager_FakeInput_StartStop(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := NewManager(newErrorLogger(t), "localhost:0", apmtest.DiscardTracer) + ai, _ := info.NewAgentInfo(true) + m, err := NewManager(newErrorLogger(t), "localhost:0", ai, apmtest.DiscardTracer) require.NoError(t, err) errCh := make(chan error) go func() { - errCh <- m.Run(ctx) + err := m.Run(ctx) + if errors.Is(err, context.Canceled) { + err = nil + } + errCh <- err }() waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) @@ -259,11 +271,16 @@ func TestManager_FakeInput_BadUnitToGood(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := NewManager(newErrorLogger(t), "localhost:0", apmtest.DiscardTracer) + ai, _ := info.NewAgentInfo(true) + m, err := NewManager(newErrorLogger(t), "localhost:0", ai, apmtest.DiscardTracer) require.NoError(t, err) errCh := make(chan error) go func() { - errCh <- m.Run(ctx) + err := m.Run(ctx) + if errors.Is(err, context.Canceled) { + err = nil + } + errCh <- err }() waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) @@ -426,11 +443,16 @@ func TestManager_FakeInput_GoodUnitToBad(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := NewManager(newErrorLogger(t), "localhost:0", apmtest.DiscardTracer) + ai, _ := info.NewAgentInfo(true) + m, err := NewManager(newErrorLogger(t), "localhost:0", ai, apmtest.DiscardTracer) require.NoError(t, err) errCh := make(chan error) go func() { - errCh <- m.Run(ctx) + err := m.Run(ctx) + if errors.Is(err, context.Canceled) { + err = nil + } + errCh <- err }() waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) @@ -576,11 +598,16 @@ func TestManager_FakeInput_Configure(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := NewManager(newErrorLogger(t), "localhost:0", apmtest.DiscardTracer) + ai, _ := info.NewAgentInfo(true) + m, err := NewManager(newErrorLogger(t), "localhost:0", ai, apmtest.DiscardTracer) require.NoError(t, err) errCh := make(chan error) go func() { - errCh <- m.Run(ctx) + err := m.Run(ctx) + if errors.Is(err, context.Canceled) { + err = nil + } + errCh <- err }() waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) @@ -698,11 +725,16 @@ func TestManager_FakeInput_RemoveUnit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := NewManager(newErrorLogger(t), "localhost:0", apmtest.DiscardTracer) + ai, _ := info.NewAgentInfo(true) + m, err := NewManager(newErrorLogger(t), "localhost:0", ai, apmtest.DiscardTracer) require.NoError(t, err) errCh := make(chan error) go func() { - errCh <- m.Run(ctx) + err := m.Run(ctx) + if errors.Is(err, context.Canceled) { + err = nil + } + errCh <- err }() waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) @@ -852,11 +884,16 @@ func TestManager_FakeInput_ActionState(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := NewManager(newErrorLogger(t), "localhost:0", apmtest.DiscardTracer) + ai, _ := info.NewAgentInfo(true) + m, err := NewManager(newErrorLogger(t), "localhost:0", ai, apmtest.DiscardTracer) require.NoError(t, err) errCh := make(chan error) go func() { - errCh <- m.Run(ctx) + err := m.Run(ctx) + if errors.Is(err, context.Canceled) { + err = nil + } + errCh <- err }() waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) @@ -978,11 +1015,16 @@ func TestManager_FakeInput_Restarts(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := NewManager(newErrorLogger(t), "localhost:0", apmtest.DiscardTracer) + ai, _ := info.NewAgentInfo(true) + m, err := NewManager(newErrorLogger(t), "localhost:0", ai, apmtest.DiscardTracer) require.NoError(t, err) errCh := make(chan error) go func() { - errCh <- m.Run(ctx) + err := m.Run(ctx) + if errors.Is(err, context.Canceled) { + err = nil + } + errCh <- err }() waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) @@ -1113,11 +1155,16 @@ func TestManager_FakeInput_RestartsOnMissedCheckins(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := NewManager(newErrorLogger(t), "localhost:0", apmtest.DiscardTracer) + ai, _ := info.NewAgentInfo(true) + m, err := NewManager(newErrorLogger(t), "localhost:0", ai, apmtest.DiscardTracer) require.NoError(t, err) errCh := make(chan error) go func() { - errCh <- m.Run(ctx) + err := m.Run(ctx) + if errors.Is(err, context.Canceled) { + err = nil + } + errCh <- err }() waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) @@ -1230,11 +1277,16 @@ func TestManager_FakeInput_InvalidAction(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := NewManager(newErrorLogger(t), "localhost:0", apmtest.DiscardTracer) + ai, _ := info.NewAgentInfo(true) + m, err := NewManager(newErrorLogger(t), "localhost:0", ai, apmtest.DiscardTracer) require.NoError(t, err) errCh := make(chan error) go func() { - errCh <- m.Run(ctx) + err := m.Run(ctx) + if errors.Is(err, context.Canceled) { + err = nil + } + errCh <- err }() waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) @@ -1350,11 +1402,16 @@ func TestManager_FakeInput_MultiComponent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := NewManager(newErrorLogger(t), "localhost:0", apmtest.DiscardTracer) + ai, _ := info.NewAgentInfo(true) + m, err := NewManager(newErrorLogger(t), "localhost:0", ai, apmtest.DiscardTracer) require.NoError(t, err) errCh := make(chan error) go func() { - errCh <- m.Run(ctx) + err := m.Run(ctx) + if errors.Is(err, context.Canceled) { + err = nil + } + errCh <- err }() waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) @@ -1558,11 +1615,16 @@ func TestManager_FakeInput_LogLevel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := NewManager(newErrorLogger(t), "localhost:0", apmtest.DiscardTracer) + ai, _ := info.NewAgentInfo(true) + m, err := NewManager(newErrorLogger(t), "localhost:0", ai, apmtest.DiscardTracer) require.NoError(t, err) errCh := make(chan error) go func() { - errCh <- m.Run(ctx) + err := m.Run(ctx) + if errors.Is(err, context.Canceled) { + err = nil + } + errCh <- err }() waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second)