diff --git a/internal/pkg/agent/application/upgrade/step_mark.go b/internal/pkg/agent/application/upgrade/step_mark.go index e176e4c5b96..66924337699 100644 --- a/internal/pkg/agent/application/upgrade/step_mark.go +++ b/internal/pkg/agent/application/upgrade/step_mark.go @@ -38,6 +38,58 @@ type UpdateMarker struct { Action *fleetapi.ActionUpgrade `json:"action" yaml:"action"` } +// MarkerActionUpgrade adapter struct compatible with pre 8.3 version of the marker file format +type MarkerActionUpgrade struct { + ActionID string `yaml:"id"` + ActionType string `yaml:"type"` + Version string `yaml:"version"` + SourceURI string `yaml:"source_uri,omitempty"` +} + +func convertToMarkerAction(a *fleetapi.ActionUpgrade) *MarkerActionUpgrade { + if a == nil { + return nil + } + return &MarkerActionUpgrade{ + ActionID: a.ActionID, + ActionType: a.ActionType, + Version: a.Version, + SourceURI: a.SourceURI, + } +} + +func convertToActionUpgrade(a *MarkerActionUpgrade) *fleetapi.ActionUpgrade { + if a == nil { + return nil + } + return &fleetapi.ActionUpgrade{ + ActionID: a.ActionID, + ActionType: a.ActionType, + Version: a.Version, + SourceURI: a.SourceURI, + } +} + +type updateMarkerSerializer struct { + Hash string `yaml:"hash"` + UpdatedOn time.Time `yaml:"updated_on"` + PrevVersion string `yaml:"prev_version"` + PrevHash string `yaml:"prev_hash"` + Acked bool `yaml:"acked"` + Action *MarkerActionUpgrade `yaml:"action"` +} + +func newMarkerSerializer(m *UpdateMarker) *updateMarkerSerializer { + return &updateMarkerSerializer{ + Hash: m.Hash, + UpdatedOn: m.UpdatedOn, + PrevVersion: m.PrevVersion, + PrevHash: m.PrevHash, + Acked: m.Acked, + Action: convertToMarkerAction(m.Action), + } +} + // markUpgrade marks update happened so we can handle grace period func (u *Upgrader) markUpgrade(_ context.Context, hash string, action Action) error { prevVersion := release.Version() @@ -46,7 +98,7 @@ func (u *Upgrader) markUpgrade(_ context.Context, hash string, action Action) er prevHash = prevHash[:hashLen] } - marker := UpdateMarker{ + marker := &UpdateMarker{ Hash: hash, UpdatedOn: time.Now(), PrevVersion: prevVersion, @@ -54,7 +106,7 @@ func (u *Upgrader) markUpgrade(_ context.Context, hash string, action Action) er Action: action.FleetAction(), } - markerBytes, err := yaml.Marshal(marker) + markerBytes, err := yaml.Marshal(newMarkerSerializer(marker)) if err != nil { return errors.New(err, errors.TypeConfig, "failed to parse marker file") } @@ -103,16 +155,31 @@ func LoadMarker() (*UpdateMarker, error) { return nil, err } - marker := &UpdateMarker{} + marker := &updateMarkerSerializer{} if err := yaml.Unmarshal(markerBytes, &marker); err != nil { return nil, err } - return marker, nil + return &UpdateMarker{ + Hash: marker.Hash, + UpdatedOn: marker.UpdatedOn, + PrevVersion: marker.PrevVersion, + PrevHash: marker.PrevHash, + Acked: marker.Acked, + Action: convertToActionUpgrade(marker.Action), + }, nil } func saveMarker(marker *UpdateMarker) error { - markerBytes, err := yaml.Marshal(marker) + makerSerializer := &updateMarkerSerializer{ + Hash: marker.Hash, + UpdatedOn: marker.UpdatedOn, + PrevVersion: marker.PrevVersion, + PrevHash: marker.PrevHash, + Acked: marker.Acked, + Action: convertToMarkerAction(marker.Action), + } + markerBytes, err := yaml.Marshal(makerSerializer) if err != nil { return err } diff --git a/internal/pkg/agent/application/upgrade/upgrade.go b/internal/pkg/agent/application/upgrade/upgrade.go index 81fb7a78444..9d67165d0eb 100644 --- a/internal/pkg/agent/application/upgrade/upgrade.go +++ b/internal/pkg/agent/application/upgrade/upgrade.go @@ -5,7 +5,6 @@ package upgrade import ( - "bytes" "context" "fmt" "io/ioutil" @@ -20,10 +19,8 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/secret" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/program" - "github.com/elastic/elastic-agent/internal/pkg/agent/storage" "github.com/elastic/elastic-agent/internal/pkg/artifact" "github.com/elastic/elastic-agent/internal/pkg/capabilities" "github.com/elastic/elastic-agent/internal/pkg/core/state" @@ -173,10 +170,6 @@ func (u *Upgrader) Upgrade(ctx context.Context, a Action, reexecNow bool) (_ ree return nil, errors.New(err, "failed to copy action store") } - if err := encryptConfigIfNeeded(u.log, newHash); err != nil { - return nil, errors.New(err, "failed to encrypt the configuration") - } - if err := ChangeSymlink(ctx, newHash); err != nil { rollbackInstall(ctx, newHash) return nil, err @@ -220,6 +213,8 @@ func (u *Upgrader) Ack(ctx context.Context) error { return err } + marker.Acked = true + return saveMarker(marker) } @@ -335,73 +330,6 @@ func copyVault(newHash string) error { return nil } -// Create the key if it doesn't exist and encrypt the fleet.yml and state.yml -func encryptConfigIfNeeded(log *logger.Logger, newHash string) (err error) { - vaultPath := getVaultPath(newHash) - - err = secret.CreateAgentSecret(secret.WithVaultPath(vaultPath)) - if err != nil { - return err - } - - newHome := filepath.Join(filepath.Dir(paths.Home()), fmt.Sprintf("%s-%s", agentName, newHash)) - ymlStateStorePath := filepath.Join(newHome, filepath.Base(paths.AgentStateStoreYmlFile())) - stateStorePath := filepath.Join(newHome, filepath.Base(paths.AgentStateStoreFile())) - - files := []struct { - Src string - Dst string - }{ - { - Src: ymlStateStorePath, - Dst: stateStorePath, - }, - { - Src: paths.AgentConfigYmlFile(), - Dst: paths.AgentConfigFile(), - }, - } - for _, f := range files { - var b []byte - b, err = ioutil.ReadFile(f.Src) - if err != nil { - if os.IsNotExist(err) { - continue - } - return err - } - - // Encrypt yml file - store := storage.NewEncryptedDiskStore(f.Dst, storage.WithVaultPath(vaultPath)) - err = store.Save(bytes.NewReader(b)) - if err != nil { - return err - } - - // Remove yml file if no errors - defer func(fp string) { - if err != nil { - return - } - if rerr := os.Remove(fp); rerr != nil { - log.Warnf("failed to remove file: %s, err: %v", fp, rerr) - } - }(f.Src) - } - - // Do not remove AgentConfigYmlFile lock file if any error happened. - if err != nil { - return err - } - - lockFp := paths.AgentConfigYmlFile() + ".lock" - if rerr := os.Remove(lockFp); rerr != nil { - log.Warnf("failed to remove file: %s, err: %v", lockFp, rerr) - } - - return err -} - // shutdownCallback returns a callback function to be executing during shutdown once all processes are closed. // this goes through runtime directory of agent and copies all the state files created by processes to new versioned // home directory with updated process name to match new version. diff --git a/internal/pkg/agent/cleaner/cleaner.go b/internal/pkg/agent/cleaner/cleaner.go new file mode 100644 index 00000000000..856ae020b89 --- /dev/null +++ b/internal/pkg/agent/cleaner/cleaner.go @@ -0,0 +1,111 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cleaner + +import ( + "context" + "os" + "sync" + "time" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent/internal/pkg/fileutil" +) + +// Wait interval. +// If the watchFile was not modified after this interval, then remove all the files in the removeFiles array +const defaultCleanWait = 15 * time.Minute + +type Cleaner struct { + log *logp.Logger + watchFile string + removeFiles []string + cleanWait time.Duration + + mx sync.Mutex +} + +type OptionFunc func(c *Cleaner) + +func New(log *logp.Logger, watchFile string, removeFiles []string, opts ...OptionFunc) *Cleaner { + c := &Cleaner{ + log: log, + watchFile: watchFile, + removeFiles: removeFiles, + cleanWait: defaultCleanWait, + } + + for _, opt := range opts { + opt(c) + } + return c +} + +func WithCleanWait(cleanWait time.Duration) OptionFunc { + return func(c *Cleaner) { + c.cleanWait = cleanWait + } +} + +func (c *Cleaner) Run(ctx context.Context) error { + wait, done, err := c.process() + if err != nil { + return err + } + + if done { + return nil + } + + t := time.NewTimer(wait) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return nil + case <-t.C: + c.log.Debug("cleaner: timer triggered") + wait, done, err = c.process() + if err != nil { + return err + } + + if done { + return nil + } + t.Reset(wait) + } + } +} + +func (c *Cleaner) process() (wait time.Duration, done bool, err error) { + modTime, err := fileutil.GetModTime(c.watchFile) + if err != nil { + return + } + + c.log.Debugf("cleaner: check file %s mod time: %v", c.watchFile, modTime) + curDur := time.Since(modTime) + if curDur > c.cleanWait { + c.log.Debugf("cleaner: file %s modification expired", c.watchFile) + c.deleteFiles() + return wait, true, nil + } + wait = c.cleanWait - curDur + return wait, false, nil +} + +func (c *Cleaner) deleteFiles() { + c.log.Debugf("cleaner: delete files: %v", c.removeFiles) + c.mx.Lock() + defer c.mx.Unlock() + for _, fp := range c.removeFiles { + c.log.Debugf("cleaner: delete file: %v", fp) + err := os.Remove(fp) + if err != nil { + c.log.Warnf("cleaner: delete file %v failed: %v", fp, err) + } + } +} diff --git a/internal/pkg/agent/cleaner/cleaner_test.go b/internal/pkg/agent/cleaner/cleaner_test.go new file mode 100644 index 00000000000..cf189b784d3 --- /dev/null +++ b/internal/pkg/agent/cleaner/cleaner_test.go @@ -0,0 +1,68 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cleaner + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/elastic/elastic-agent-libs/logp" +) + +func TestCleaner(t *testing.T) { + // Setup + const watchFileName = "fleet.enc" + removeFiles := []string{"fleet.yml", "fleet.yml.lock"} + + ctx, cn := context.WithCancel(context.Background()) + defer cn() + + dir := t.TempDir() + watchFilePath := filepath.Join(dir, watchFileName) + + removeFilePaths := make([]string, len(removeFiles)) + + checkDir(t, dir, 0) + + // Create files + err := ioutil.WriteFile(watchFilePath, []byte{}, 0600) + if err != nil { + t.Fatal(err) + } + + for i, fn := range removeFiles { + removeFilePaths[i] = filepath.Join(dir, fn) + err := ioutil.WriteFile(removeFilePaths[i], []byte{}, 0600) + if err != nil { + t.Fatal(err) + } + } + + checkDir(t, dir, len(removeFiles)+1) + + log := logp.NewLogger("dynamic") + cleaner := New(log, watchFilePath, removeFilePaths, WithCleanWait(500*time.Millisecond)) + err = cleaner.Run(ctx) + if err != nil { + t.Fatal(err) + } + checkDir(t, dir, 1) +} + +func checkDir(t *testing.T, dir string, expectedCount int) { + t.Helper() + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatal(err) + } + + if len(entries) != expectedCount { + t.Fatalf("Dir %s expected %d entries, found %d", dir, expectedCount, len(entries)) + } +} diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index b584baf2f09..723631b7960 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -5,6 +5,7 @@ package cmd import ( + "bytes" "context" "fmt" "io/ioutil" @@ -20,6 +21,7 @@ import ( "gopkg.in/yaml.v2" "github.com/elastic/elastic-agent-libs/api" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/service" "github.com/elastic/elastic-agent-system-metrics/report" @@ -31,6 +33,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" "github.com/elastic/elastic-agent/internal/pkg/agent/application/secret" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade" + "github.com/elastic/elastic-agent/internal/pkg/agent/cleaner" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/agent/control/server" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" @@ -41,6 +44,7 @@ import ( monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" monitoringServer "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/server" "github.com/elastic/elastic-agent/internal/pkg/core/status" + "github.com/elastic/elastic-agent/internal/pkg/fileutil" "github.com/elastic/elastic-agent/internal/pkg/release" "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/version" @@ -126,6 +130,19 @@ func run(override cfgOverrider) error { return err } + // Check if the fleet.yml or state.yml exists and encrypt them. + // This is needed to handle upgrade properly. + // On agent upgrade the older version for example 8.2 unpacks the 8.3 agent + // and tries to run it. + // The new version of the agent requires encrypted configuration files or it will not start and upgrade will fail and revert. + err = encryptConfigIfNeeded(logger) + if err != nil { + return err + } + + // Start the old unencrypted agent configuration file cleaner + startOldAgentConfigCleaner(ctx, logger) + agentInfo, err := info.NewAgentInfoWithLog(defaultLogLevel(cfg), createAgentID) if err != nil { return errors.New(err, @@ -476,3 +493,121 @@ func initTracer(agentName, version string, mcfg *monitoringCfg.MonitoringConfig) Transport: ts, }) } + +// encryptConfigIfNeeded encrypts fleet.yml or state.yml if fleet.enc or state.enc does not exist already. +func encryptConfigIfNeeded(log *logger.Logger) (err error) { + log.Debug("encrypt config if needed") + + files := []struct { + Src string + Dst string + }{ + { + Src: paths.AgentStateStoreYmlFile(), + Dst: paths.AgentStateStoreFile(), + }, + { + Src: paths.AgentConfigYmlFile(), + Dst: paths.AgentConfigFile(), + }, + } + for _, f := range files { + var b []byte + + // Check if .yml file modification timestamp and existence + log.Debugf("check if the yml file %v exists", f.Src) + ymlModTime, ymlExists, err := fileutil.GetModTimeExists(f.Src) + if err != nil { + log.Errorf("failed to access yml file %v: %v", f.Src, err) + return err + } + + if !ymlExists { + log.Debugf("yml file %v doesn't exists, continue", f.Src) + continue + } + + // Check if .enc file modification timestamp and existence + log.Debugf("check if the enc file %v exists", f.Dst) + encModTime, encExists, err := fileutil.GetModTimeExists(f.Dst) + if err != nil { + log.Errorf("failed to access enc file %v: %v", f.Dst, err) + return err + } + + // If enc file exists and the yml file modification time is before enc file modification time then skip encryption. + // The reasoning is that the yml was not modified since the last time it was migrated to the encrypted file. + // The modification of the yml is possible in the cases where the agent upgrade failed and rolled back, leaving .enc file on the disk for example + if encExists && ymlModTime.Before(encModTime) { + log.Debugf("enc file %v already exists, and the yml was not modified after migration, yml mod time: %v, enc mod time: %v", f.Dst, ymlModTime, encModTime) + continue + } + + log.Debugf("read file: %v", f.Src) + b, err = ioutil.ReadFile(f.Src) + if err != nil { + log.Debugf("read file: %v, err: %v", f.Src, err) + return err + } + + // Encrypt yml file + log.Debugf("encrypt file %v into %v", f.Src, f.Dst) + store := storage.NewEncryptedDiskStore(f.Dst) + err = store.Save(bytes.NewReader(b)) + if err != nil { + log.Debugf("failed to encrypt file: %v, err: %v", f.Dst, err) + return err + } + } + + if err != nil { + return err + } + + // Remove state.yml file if no errors + fp := paths.AgentStateStoreYmlFile() + // Check if state.yml exists + exists, err := fileutil.FileExists(fp) + if err != nil { + log.Warnf("failed to check if file %s exists, err: %v", fp, err) + } + if exists { + if err := os.Remove(fp); err != nil { + // Log only + log.Warnf("failed to remove file: %s, err: %v", fp, err) + } + } + + // The agent can't remove fleet.yml, because it can be rolled back by the older version of the agent "watcher" + // and pre 8.3 version needs unencrypted fleet.yml file in order to start. + // The fleet.yml file removal is performed by the cleaner on the agent start after the .enc configuration was stable for the grace period after upgrade + + return nil +} + +// startOldAgentConfigCleaner starts the cleaner that removes fleet.yml and fleet.yml.lock files after 15 mins by default +// The interval is calculated from the last modified time of fleet.enc. It's possible that the fleet.enc +// will be modified again during that time, the assumption is that at some point there will be 15 mins interval when the fleet.enc is not modified. +// The modification time is used because it's the most cross-patform compatible timestamp on the files. +// This is tied to grace period, default 10 mins, when the agent is considered "stable" after the upgrade. +// The old agent watcher doesn't know anything about configuration encryption so we have to delete the old configuration files here. +// The cleaner is only started if fleet.yml exists +func startOldAgentConfigCleaner(ctx context.Context, log *logp.Logger) { + // Start cleaner only when fleet.yml exists + fp := paths.AgentConfigYmlFile() + exists, err := fileutil.FileExists(fp) + if err != nil { + log.Warnf("failed to check if file %s exists, err: %v", fp, err) + } + if !exists { + return + } + + c := cleaner.New(log, paths.AgentConfigFile(), []string{fp, fmt.Sprintf("%s.lock", fp)}) + go func() { + err := c.Run(ctx) + if err != nil { + log.Warnf("failed running the old configuration files cleaner, err: %v", err) + } + }() +} diff --git a/internal/pkg/fileutil/fileutil.go b/internal/pkg/fileutil/fileutil.go new file mode 100644 index 00000000000..86d1db249aa --- /dev/null +++ b/internal/pkg/fileutil/fileutil.go @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fileutil + +import ( + "errors" + "io/fs" + "os" + "time" +) + +// FileExists returns true if file/dir exists +func FileExists(fp string) (bool, error) { + _, err := os.Stat(fp) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return false, nil + } + return false, err + } + return true, nil +} + +// GetModTime returns file modification time +func GetModTime(fp string) (time.Time, error) { + fi, err := os.Stat(fp) + if err != nil { + return time.Time{}, err + } + return fi.ModTime(), nil +} + +// GetModTimeExists returns file modification time and existence status +// Returns no error if the file doesn't exists +func GetModTimeExists(fp string) (time.Time, bool, error) { + modTime, err := GetModTime(fp) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return modTime, false, nil + } + return modTime, false, err + } + return modTime, true, nil +}