diff --git a/internal/pkg/agent/application/local_mode.go b/internal/pkg/agent/application/local_mode.go index f06949bcba1..e6496b44860 100644 --- a/internal/pkg/agent/application/local_mode.go +++ b/internal/pkg/agent/application/local_mode.go @@ -119,6 +119,11 @@ func newLocal( return nil, errors.New(err, "failed to initialize composable controller") } + routerArtifactReloader, ok := router.(emitter.Reloader) + if !ok { + return nil, errors.New("router not capable of artifact reload") // Needed for client reloading + } + discover := discoverer(pathConfigFile, cfg.Settings.Path, externalConfigsGlob()) emit, err := emitter.New( localApplication.bgContext, @@ -133,6 +138,7 @@ func newLocal( caps, monitor, artifact.NewReloader(cfg.Settings.DownloadConfig, log), + routerArtifactReloader, ) if err != nil { return nil, err diff --git a/internal/pkg/agent/application/managed_mode.go b/internal/pkg/agent/application/managed_mode.go index 3f98e78fd62..08c43aeeca3 100644 --- a/internal/pkg/agent/application/managed_mode.go +++ b/internal/pkg/agent/application/managed_mode.go @@ -146,6 +146,11 @@ func newManaged( return nil, errors.New(err, "failed to initialize composable controller") } + routerArtifactReloader, ok := router.(emitter.Reloader) + if !ok { + return nil, errors.New("router not capable of artifact reload") // Needed for client reloading + } + emit, err := emitter.New( managedApplication.bgContext, log, @@ -159,6 +164,7 @@ func newManaged( caps, monitor, artifact.NewReloader(cfg.Settings.DownloadConfig, log), + routerArtifactReloader, ) if err != nil { return nil, err diff --git a/internal/pkg/agent/application/pipeline/emitter/controller.go b/internal/pkg/agent/application/pipeline/emitter/controller.go index 7f83961586c..1085046ea64 100644 --- a/internal/pkg/agent/application/pipeline/emitter/controller.go +++ b/internal/pkg/agent/application/pipeline/emitter/controller.go @@ -21,7 +21,7 @@ import ( "github.com/elastic/elastic-agent/pkg/core/logger" ) -type reloadable interface { +type Reloader interface { Reload(cfg *config.Config) error } @@ -32,7 +32,7 @@ type Controller struct { controller composable.Controller router pipeline.Router modifiers *pipeline.ConfigModifiers - reloadables []reloadable + reloadables []Reloader caps capabilities.Capability // state @@ -51,7 +51,7 @@ func NewController( router pipeline.Router, modifiers *pipeline.ConfigModifiers, caps capabilities.Capability, - reloadables ...reloadable, + reloadables ...Reloader, ) *Controller { init, _ := transpiler.NewVars(map[string]interface{}{}, nil) diff --git a/internal/pkg/agent/application/pipeline/emitter/emitter.go b/internal/pkg/agent/application/pipeline/emitter/emitter.go index 7855fb51602..4a42c99d620 100644 --- a/internal/pkg/agent/application/pipeline/emitter/emitter.go +++ b/internal/pkg/agent/application/pipeline/emitter/emitter.go @@ -22,7 +22,7 @@ import ( ) // New creates a new emitter function. -func New(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, controller composable.Controller, router pipeline.Router, modifiers *pipeline.ConfigModifiers, caps capabilities.Capability, reloadables ...reloadable) (pipeline.EmitterFunc, error) { +func New(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, controller composable.Controller, router pipeline.Router, modifiers *pipeline.ConfigModifiers, caps capabilities.Capability, reloadables ...Reloader) (pipeline.EmitterFunc, error) { log.Debugf("Supported programs: %s", strings.Join(program.KnownProgramNames(), ", ")) ctrl := NewController(log, agentInfo, controller, router, modifiers, caps, reloadables...) diff --git a/internal/pkg/agent/application/pipeline/router/router.go b/internal/pkg/agent/application/pipeline/router/router.go index e1f1d63c8b5..274089bbc60 100644 --- a/internal/pkg/agent/application/pipeline/router/router.go +++ b/internal/pkg/agent/application/pipeline/router/router.go @@ -11,8 +11,10 @@ import ( "time" "github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline/emitter" "github.com/elastic/elastic-agent/internal/pkg/agent/configrequest" "github.com/elastic/elastic-agent/internal/pkg/agent/program" + "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/sorted" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -35,6 +37,27 @@ func New(log *logger.Logger, factory pipeline.StreamFunc) (pipeline.Router, erro return &router{log: log, streamFactory: factory, routes: sorted.NewSet()}, nil } +func (r *router) Reload(c *config.Config) error { + keys := r.routes.Keys() + for _, key := range keys { + route, found := r.routes.Get(key) + if !found { + continue + } + + routeReloader, ok := route.(emitter.Reloader) + if !ok { + continue + } + + if err := routeReloader.Reload(c); err != nil { + return err + } + } + + return nil +} + func (r *router) Routes() *sorted.Set { return r.routes } diff --git a/internal/pkg/agent/application/pipeline/stream/operator_stream.go b/internal/pkg/agent/application/pipeline/stream/operator_stream.go index ee4ee44079e..9216e12fe82 100644 --- a/internal/pkg/agent/application/pipeline/stream/operator_stream.go +++ b/internal/pkg/agent/application/pipeline/stream/operator_stream.go @@ -10,8 +10,10 @@ import ( "go.elastic.co/apm" "github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline/emitter" "github.com/elastic/elastic-agent/internal/pkg/agent/configrequest" "github.com/elastic/elastic-agent/internal/pkg/agent/program" + "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/core/state" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -29,6 +31,15 @@ type specer interface { Specs() map[string]program.Spec } +func (b *operatorStream) Reload(c *config.Config) error { + r, ok := b.configHandler.(emitter.Reloader) + if !ok { + return nil + } + + return r.Reload(c) +} + func (b *operatorStream) Close() error { return b.configHandler.Close() } diff --git a/internal/pkg/agent/operation/operator.go b/internal/pkg/agent/operation/operator.go index 71cb6569671..ed28b7cb633 100644 --- a/internal/pkg/agent/operation/operator.go +++ b/internal/pkg/agent/operation/operator.go @@ -20,9 +20,11 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/program" "github.com/elastic/elastic-agent/internal/pkg/agent/stateresolver" + "github.com/elastic/elastic-agent/internal/pkg/artifact" "github.com/elastic/elastic-agent/internal/pkg/artifact/download" "github.com/elastic/elastic-agent/internal/pkg/artifact/install" "github.com/elastic/elastic-agent/internal/pkg/artifact/uninstall" + "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/core/app" "github.com/elastic/elastic-agent/internal/pkg/core/monitoring" "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/noop" @@ -115,12 +117,51 @@ func NewOperator( operator.initHandlerMap() - os.MkdirAll(config.DownloadConfig.TargetDirectory, 0755) - os.MkdirAll(config.DownloadConfig.InstallPath, 0755) + if err := os.MkdirAll(config.DownloadConfig.TargetDirectory, 0755); err != nil { + // can already exists from previous runs, not an error + logger.Warnf("failed creating %q: %v", config.DownloadConfig.TargetDirectory, err) + } + if err := os.MkdirAll(config.DownloadConfig.InstallPath, 0755); err != nil { + // can already exists from previous runs, not an error + logger.Warnf("failed creating %q: %v", config.DownloadConfig.InstallPath, err) + } return operator, nil } +func (o *Operator) Reload(rawConfig *config.Config) error { + // save some unpacking in downloaders + type reloadConfig struct { + C *artifact.Config `json:"agent.download" config:"agent.download"` + } + tmp := &reloadConfig{ + C: artifact.DefaultConfig(), + } + if err := rawConfig.Unpack(&tmp); err != nil { + return errors.New(err, "failed to unpack artifact config") + } + + if err := o.reloadComponent(o.downloader, "downloader", tmp.C); err != nil { + return err + } + + return o.reloadComponent(o.verifier, "verifier", tmp.C) +} + +func (o *Operator) reloadComponent(component interface{}, name string, cfg *artifact.Config) error { + r, ok := component.(artifact.ConfigReloader) + if !ok { + o.logger.Debugf("failed reloading %q: component is not reloadable", name) + return nil // not an error, could be filesystem downloader/verifier + } + + if err := r.Reload(cfg); err != nil { + return errors.New(err, fmt.Sprintf("failed reloading %q config", component)) + } + + return nil +} + // State describes the current state of the system. // Reports all known applications and theirs states. Whether they are running // or not, and if they are information about process is also present. @@ -238,12 +279,12 @@ func (o *Operator) Shutdown() { a.Shutdown() wg.Done() o.logger.Debugf("took %s to shutdown %s", - time.Now().Sub(started), a.Name()) + time.Since(started), a.Name()) }(a) } wg.Wait() o.logger.Debugf("took %s to shutdown %d apps", - time.Now().Sub(started), len(o.apps)) + time.Since(started), len(o.apps)) } // Start starts a new process based on a configuration diff --git a/internal/pkg/artifact/config.go b/internal/pkg/artifact/config.go index d88031e5de5..76637c28d31 100644 --- a/internal/pkg/artifact/config.go +++ b/internal/pkg/artifact/config.go @@ -25,6 +25,10 @@ const ( defaultSourceURI = "https://artifacts.elastic.co/downloads/" ) +type ConfigReloader interface { + Reload(*Config) error +} + // Config is a configuration used for verifier and downloader type Config struct { // OperatingSystem: operating system [linux, windows, darwin] @@ -53,14 +57,16 @@ type Config struct { } type Reloader struct { - log *logger.Logger - cfg *Config + log *logger.Logger + cfg *Config + reloaders []ConfigReloader } -func NewReloader(cfg *Config, log *logger.Logger) *Reloader { +func NewReloader(cfg *Config, log *logger.Logger, rr ...ConfigReloader) *Reloader { return &Reloader{ - cfg: cfg, - log: log, + cfg: cfg, + log: log, + reloaders: rr, } } @@ -73,6 +79,12 @@ func (r *Reloader) Reload(rawConfig *config.Config) error { return errors.New(err, "failed to reload source URI") } + for _, reloader := range r.reloaders { + if err := reloader.Reload(r.cfg); err != nil { + return errors.New(err, "failed reloading config") + } + } + return nil } diff --git a/internal/pkg/artifact/download/composed/downloader.go b/internal/pkg/artifact/download/composed/downloader.go index 0b8504172f3..06c78fecdd6 100644 --- a/internal/pkg/artifact/download/composed/downloader.go +++ b/internal/pkg/artifact/download/composed/downloader.go @@ -10,7 +10,9 @@ import ( "github.com/hashicorp/go-multierror" "go.elastic.co/apm" + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/program" + "github.com/elastic/elastic-agent/internal/pkg/artifact" "github.com/elastic/elastic-agent/internal/pkg/artifact/download" ) @@ -50,3 +52,17 @@ func (e *Downloader) Download(ctx context.Context, spec program.Spec, version st return "", err } + +func (e *Downloader) Reload(c *artifact.Config) error { + for _, d := range e.dd { + reloadable, ok := d.(download.Reloader) + if !ok { + continue + } + + if err := reloadable.Reload(c); err != nil { + return errors.New(err, "failed reloading artifact config for composed downloader") + } + } + return nil +} diff --git a/internal/pkg/artifact/download/composed/verifier.go b/internal/pkg/artifact/download/composed/verifier.go index 9fb60d20007..ec99dfa4b83 100644 --- a/internal/pkg/artifact/download/composed/verifier.go +++ b/internal/pkg/artifact/download/composed/verifier.go @@ -5,11 +5,11 @@ package composed import ( - "errors" - "github.com/hashicorp/go-multierror" + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/program" + "github.com/elastic/elastic-agent/internal/pkg/artifact" "github.com/elastic/elastic-agent/internal/pkg/artifact/download" ) @@ -54,3 +54,17 @@ func (e *Verifier) Verify(spec program.Spec, version string) error { return err } + +func (e *Verifier) Reload(c *artifact.Config) error { + for _, v := range e.vv { + reloadable, ok := v.(download.Reloader) + if !ok { + continue + } + + if err := reloadable.Reload(c); err != nil { + return errors.New(err, "failed reloading artifact config for composed verifier") + } + } + return nil +} diff --git a/internal/pkg/artifact/download/http/downloader.go b/internal/pkg/artifact/download/http/downloader.go index 2da6e3d1015..9dba5783bc7 100644 --- a/internal/pkg/artifact/download/http/downloader.go +++ b/internal/pkg/artifact/download/http/downloader.go @@ -20,7 +20,6 @@ import ( "github.com/elastic/elastic-agent-libs/atomic" "github.com/elastic/elastic-agent-libs/transport/httpcommon" - "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/program" "github.com/elastic/elastic-agent/internal/pkg/artifact" @@ -73,6 +72,23 @@ func NewDownloaderWithClient(log progressLogger, config *artifact.Config, client } } +func (e *Downloader) Reload(c *artifact.Config) error { + // reload client + client, err := c.HTTPTransportSettings.Client( + httpcommon.WithAPMHTTPInstrumentation(), + ) + if err != nil { + return errors.New(err, "http.downloader: failed to generate client out of config") + } + + client.Transport = withHeaders(client.Transport, headers) + + e.client = *client + e.config = c + + return nil +} + // Download fetches the package from configured source. // Returns absolute path to downloaded package and an error. func (e *Downloader) Download(ctx context.Context, spec program.Spec, version string) (_ string, err error) { @@ -80,7 +96,9 @@ func (e *Downloader) Download(ctx context.Context, spec program.Spec, version st defer func() { if err != nil { for _, path := range downloadedFiles { - os.Remove(path) + if err := os.Remove(path); err != nil { + e.log.Warnf("failed to cleanup %s: %v", path, err) + } } } }() @@ -164,12 +182,14 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f resp, err := e.client.Do(req.WithContext(ctx)) if err != nil { - return "", errors.New(err, "fetching package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI)) + // return path, file already exists and needs to be cleaned up + return fullPath, errors.New(err, "fetching package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI)) } defer resp.Body.Close() if resp.StatusCode != 200 { - return "", errors.New(fmt.Sprintf("call to '%s' returned unsuccessful status code: %d", sourceURI, resp.StatusCode), errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI)) + // return path, file already exists and needs to be cleaned up + return fullPath, errors.New(fmt.Sprintf("call to '%s' returned unsuccessful status code: %d", sourceURI, resp.StatusCode), errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI)) } fileSize := -1 @@ -186,7 +206,8 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f if err != nil { reportCancel() dp.ReportFailed(err) - return "", errors.New(err, "fetching package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI)) + // return path, file already exists and needs to be cleaned up + return fullPath, errors.New(err, "copying fetched package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI)) } reportCancel() dp.ReportComplete() diff --git a/internal/pkg/artifact/download/http/verifier.go b/internal/pkg/artifact/download/http/verifier.go index 1fe855fa2af..4234ccae93a 100644 --- a/internal/pkg/artifact/download/http/verifier.go +++ b/internal/pkg/artifact/download/http/verifier.go @@ -60,6 +60,24 @@ func NewVerifier(config *artifact.Config, allowEmptyPgp bool, pgp []byte) (*Veri return v, nil } +func (v *Verifier) Reload(c *artifact.Config) error { + // reload client + client, err := c.HTTPTransportSettings.Client( + httpcommon.WithAPMHTTPInstrumentation(), + httpcommon.WithModRoundtripper(func(rt http.RoundTripper) http.RoundTripper { + return withHeaders(rt, headers) + }), + ) + if err != nil { + return errors.New(err, "http.verifier: failed to generate client out of config") + } + + v.client = *client + v.config = c + + return nil +} + // Verify checks downloaded package on preconfigured // location against a key stored on elastic.co website. func (v *Verifier) Verify(spec program.Spec, version string) error { diff --git a/internal/pkg/artifact/download/reloadable.go b/internal/pkg/artifact/download/reloadable.go new file mode 100644 index 00000000000..27845510316 --- /dev/null +++ b/internal/pkg/artifact/download/reloadable.go @@ -0,0 +1,14 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package download + +import ( + "github.com/elastic/elastic-agent/internal/pkg/artifact" +) + +// Reloader is an interface allowing to reload artifact config +type Reloader interface { + Reload(*artifact.Config) error +} diff --git a/internal/pkg/artifact/download/snapshot/downloader.go b/internal/pkg/artifact/download/snapshot/downloader.go index c3680147927..619c95a4c5a 100644 --- a/internal/pkg/artifact/download/snapshot/downloader.go +++ b/internal/pkg/artifact/download/snapshot/downloader.go @@ -5,11 +5,14 @@ package snapshot import ( + "context" "encoding/json" "fmt" "strings" "github.com/elastic/elastic-agent-libs/transport/httpcommon" + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" + "github.com/elastic/elastic-agent/internal/pkg/agent/program" "github.com/elastic/elastic-agent/internal/pkg/artifact" "github.com/elastic/elastic-agent/internal/pkg/artifact/download" "github.com/elastic/elastic-agent/internal/pkg/artifact/download/http" @@ -17,6 +20,11 @@ import ( "github.com/elastic/elastic-agent/pkg/core/logger" ) +type Downloader struct { + downloader download.Downloader + versionOverride string +} + // NewDownloader creates a downloader which first checks local directory // and then fallbacks to remote if configured. func NewDownloader(log *logger.Logger, config *artifact.Config, versionOverride string) (download.Downloader, error) { @@ -24,7 +32,36 @@ func NewDownloader(log *logger.Logger, config *artifact.Config, versionOverride if err != nil { return nil, err } - return http.NewDownloader(log, cfg) + + httpDownloader, err := http.NewDownloader(log, cfg) + if err != nil { + return nil, errors.New(err, "failed to create snapshot downloader") + } + + return &Downloader{ + downloader: httpDownloader, + versionOverride: versionOverride, + }, nil +} + +func (e *Downloader) Reload(c *artifact.Config) error { + reloader, ok := e.downloader.(artifact.ConfigReloader) + if !ok { + return nil + } + + cfg, err := snapshotConfig(c, e.versionOverride) + if err != nil { + return errors.New(err, "snapshot.downloader: failed to generate snapshot config") + } + + return reloader.Reload(cfg) +} + +// Download fetches the package from configured source. +// Returns absolute path to downloaded package and an error. +func (e *Downloader) Download(ctx context.Context, spec program.Spec, version string) (string, error) { + return e.downloader.Download(ctx, spec, version) } func snapshotConfig(config *artifact.Config, versionOverride string) (*artifact.Config, error) { diff --git a/internal/pkg/artifact/download/snapshot/verifier.go b/internal/pkg/artifact/download/snapshot/verifier.go index e4e4e667be7..2e844635234 100644 --- a/internal/pkg/artifact/download/snapshot/verifier.go +++ b/internal/pkg/artifact/download/snapshot/verifier.go @@ -5,11 +5,18 @@ package snapshot import ( + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" + "github.com/elastic/elastic-agent/internal/pkg/agent/program" "github.com/elastic/elastic-agent/internal/pkg/artifact" "github.com/elastic/elastic-agent/internal/pkg/artifact/download" "github.com/elastic/elastic-agent/internal/pkg/artifact/download/http" ) +type Verifier struct { + verifier download.Verifier + versionOverride string +} + // NewVerifier creates a downloader which first checks local directory // and then fallbacks to remote if configured. func NewVerifier(config *artifact.Config, allowEmptyPgp bool, pgp []byte, versionOverride string) (download.Verifier, error) { @@ -17,5 +24,33 @@ func NewVerifier(config *artifact.Config, allowEmptyPgp bool, pgp []byte, versio if err != nil { return nil, err } - return http.NewVerifier(cfg, allowEmptyPgp, pgp) + v, err := http.NewVerifier(cfg, allowEmptyPgp, pgp) + if err != nil { + return nil, errors.New(err, "failed to create snapshot verifier") + } + + return &Verifier{ + verifier: v, + versionOverride: versionOverride, + }, nil +} + +// Verify checks the package from configured source. +func (e *Verifier) Verify(spec program.Spec, version string) error { + return e.verifier.Verify(spec, version) +} + +func (e *Verifier) Reload(c *artifact.Config) error { + reloader, ok := e.verifier.(artifact.ConfigReloader) + if !ok { + return nil + } + + cfg, err := snapshotConfig(c, e.versionOverride) + if err != nil { + return errors.New(err, "snapshot.downloader: failed to generate snapshot config") + } + + return reloader.Reload(cfg) + }