Skip to content

Commit

Permalink
Fix Agent upgrade 8.2->8.3 (#578)
Browse files Browse the repository at this point in the history
* Fix Agent upgrade 8.2->8.3
* Improve the upgrade encryption handling. Add .yml files cleanup.
* Rollback ActionUpgrade to action_id, add MarkerActionUpgrade adapter struct for marker serialization compatibility
  • Loading branch information
aleksmaus authored Jun 20, 2022
1 parent 7db7406 commit d77a91e
Show file tree
Hide file tree
Showing 6 changed files with 434 additions and 79 deletions.
77 changes: 72 additions & 5 deletions internal/pkg/agent/application/upgrade/step_mark.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,58 @@ type UpdateMarker struct {
Action *fleetapi.ActionUpgrade `json:"action" yaml:"action"`
}

// MarkerActionUpgrade adapter struct compatible with pre 8.3 version of the marker file format
type MarkerActionUpgrade struct {
ActionID string `yaml:"id"`
ActionType string `yaml:"type"`
Version string `yaml:"version"`
SourceURI string `yaml:"source_uri,omitempty"`
}

func convertToMarkerAction(a *fleetapi.ActionUpgrade) *MarkerActionUpgrade {
if a == nil {
return nil
}
return &MarkerActionUpgrade{
ActionID: a.ActionID,
ActionType: a.ActionType,
Version: a.Version,
SourceURI: a.SourceURI,
}
}

func convertToActionUpgrade(a *MarkerActionUpgrade) *fleetapi.ActionUpgrade {
if a == nil {
return nil
}
return &fleetapi.ActionUpgrade{
ActionID: a.ActionID,
ActionType: a.ActionType,
Version: a.Version,
SourceURI: a.SourceURI,
}
}

type updateMarkerSerializer struct {
Hash string `yaml:"hash"`
UpdatedOn time.Time `yaml:"updated_on"`
PrevVersion string `yaml:"prev_version"`
PrevHash string `yaml:"prev_hash"`
Acked bool `yaml:"acked"`
Action *MarkerActionUpgrade `yaml:"action"`
}

func newMarkerSerializer(m *UpdateMarker) *updateMarkerSerializer {
return &updateMarkerSerializer{
Hash: m.Hash,
UpdatedOn: m.UpdatedOn,
PrevVersion: m.PrevVersion,
PrevHash: m.PrevHash,
Acked: m.Acked,
Action: convertToMarkerAction(m.Action),
}
}

// markUpgrade marks update happened so we can handle grace period
func (u *Upgrader) markUpgrade(_ context.Context, hash string, action Action) error {
prevVersion := release.Version()
Expand All @@ -46,15 +98,15 @@ func (u *Upgrader) markUpgrade(_ context.Context, hash string, action Action) er
prevHash = prevHash[:hashLen]
}

marker := UpdateMarker{
marker := &UpdateMarker{
Hash: hash,
UpdatedOn: time.Now(),
PrevVersion: prevVersion,
PrevHash: prevHash,
Action: action.FleetAction(),
}

markerBytes, err := yaml.Marshal(marker)
markerBytes, err := yaml.Marshal(newMarkerSerializer(marker))
if err != nil {
return errors.New(err, errors.TypeConfig, "failed to parse marker file")
}
Expand Down Expand Up @@ -103,16 +155,31 @@ func LoadMarker() (*UpdateMarker, error) {
return nil, err
}

marker := &UpdateMarker{}
marker := &updateMarkerSerializer{}
if err := yaml.Unmarshal(markerBytes, &marker); err != nil {
return nil, err
}

return marker, nil
return &UpdateMarker{
Hash: marker.Hash,
UpdatedOn: marker.UpdatedOn,
PrevVersion: marker.PrevVersion,
PrevHash: marker.PrevHash,
Acked: marker.Acked,
Action: convertToActionUpgrade(marker.Action),
}, nil
}

func saveMarker(marker *UpdateMarker) error {
markerBytes, err := yaml.Marshal(marker)
makerSerializer := &updateMarkerSerializer{
Hash: marker.Hash,
UpdatedOn: marker.UpdatedOn,
PrevVersion: marker.PrevVersion,
PrevHash: marker.PrevHash,
Acked: marker.Acked,
Action: convertToMarkerAction(marker.Action),
}
markerBytes, err := yaml.Marshal(makerSerializer)
if err != nil {
return err
}
Expand Down
76 changes: 2 additions & 74 deletions internal/pkg/agent/application/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package upgrade

import (
"bytes"
"context"
"fmt"
"io/ioutil"
Expand All @@ -20,10 +19,8 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/secret"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
"github.com/elastic/elastic-agent/internal/pkg/artifact"
"github.com/elastic/elastic-agent/internal/pkg/capabilities"
"github.com/elastic/elastic-agent/internal/pkg/core/state"
Expand Down Expand Up @@ -173,10 +170,6 @@ func (u *Upgrader) Upgrade(ctx context.Context, a Action, reexecNow bool) (_ ree
return nil, errors.New(err, "failed to copy action store")
}

if err := encryptConfigIfNeeded(u.log, newHash); err != nil {
return nil, errors.New(err, "failed to encrypt the configuration")
}

if err := ChangeSymlink(ctx, newHash); err != nil {
rollbackInstall(ctx, newHash)
return nil, err
Expand Down Expand Up @@ -220,6 +213,8 @@ func (u *Upgrader) Ack(ctx context.Context) error {
return err
}

marker.Acked = true

return saveMarker(marker)
}

Expand Down Expand Up @@ -335,73 +330,6 @@ func copyVault(newHash string) error {
return nil
}

// Create the key if it doesn't exist and encrypt the fleet.yml and state.yml
func encryptConfigIfNeeded(log *logger.Logger, newHash string) (err error) {
vaultPath := getVaultPath(newHash)

err = secret.CreateAgentSecret(secret.WithVaultPath(vaultPath))
if err != nil {
return err
}

newHome := filepath.Join(filepath.Dir(paths.Home()), fmt.Sprintf("%s-%s", agentName, newHash))
ymlStateStorePath := filepath.Join(newHome, filepath.Base(paths.AgentStateStoreYmlFile()))
stateStorePath := filepath.Join(newHome, filepath.Base(paths.AgentStateStoreFile()))

files := []struct {
Src string
Dst string
}{
{
Src: ymlStateStorePath,
Dst: stateStorePath,
},
{
Src: paths.AgentConfigYmlFile(),
Dst: paths.AgentConfigFile(),
},
}
for _, f := range files {
var b []byte
b, err = ioutil.ReadFile(f.Src)
if err != nil {
if os.IsNotExist(err) {
continue
}
return err
}

// Encrypt yml file
store := storage.NewEncryptedDiskStore(f.Dst, storage.WithVaultPath(vaultPath))
err = store.Save(bytes.NewReader(b))
if err != nil {
return err
}

// Remove yml file if no errors
defer func(fp string) {
if err != nil {
return
}
if rerr := os.Remove(fp); rerr != nil {
log.Warnf("failed to remove file: %s, err: %v", fp, rerr)
}
}(f.Src)
}

// Do not remove AgentConfigYmlFile lock file if any error happened.
if err != nil {
return err
}

lockFp := paths.AgentConfigYmlFile() + ".lock"
if rerr := os.Remove(lockFp); rerr != nil {
log.Warnf("failed to remove file: %s, err: %v", lockFp, rerr)
}

return err
}

// shutdownCallback returns a callback function to be executing during shutdown once all processes are closed.
// this goes through runtime directory of agent and copies all the state files created by processes to new versioned
// home directory with updated process name to match new version.
Expand Down
111 changes: 111 additions & 0 deletions internal/pkg/agent/cleaner/cleaner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package cleaner

import (
"context"
"os"
"sync"
"time"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/internal/pkg/fileutil"
)

// Wait interval.
// If the watchFile was not modified after this interval, then remove all the files in the removeFiles array
const defaultCleanWait = 15 * time.Minute

type Cleaner struct {
log *logp.Logger
watchFile string
removeFiles []string
cleanWait time.Duration

mx sync.Mutex
}

type OptionFunc func(c *Cleaner)

func New(log *logp.Logger, watchFile string, removeFiles []string, opts ...OptionFunc) *Cleaner {
c := &Cleaner{
log: log,
watchFile: watchFile,
removeFiles: removeFiles,
cleanWait: defaultCleanWait,
}

for _, opt := range opts {
opt(c)
}
return c
}

func WithCleanWait(cleanWait time.Duration) OptionFunc {
return func(c *Cleaner) {
c.cleanWait = cleanWait
}
}

func (c *Cleaner) Run(ctx context.Context) error {
wait, done, err := c.process()
if err != nil {
return err
}

if done {
return nil
}

t := time.NewTimer(wait)
defer t.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-t.C:
c.log.Debug("cleaner: timer triggered")
wait, done, err = c.process()
if err != nil {
return err
}

if done {
return nil
}
t.Reset(wait)
}
}
}

func (c *Cleaner) process() (wait time.Duration, done bool, err error) {
modTime, err := fileutil.GetModTime(c.watchFile)
if err != nil {
return
}

c.log.Debugf("cleaner: check file %s mod time: %v", c.watchFile, modTime)
curDur := time.Since(modTime)
if curDur > c.cleanWait {
c.log.Debugf("cleaner: file %s modification expired", c.watchFile)
c.deleteFiles()
return wait, true, nil
}
wait = c.cleanWait - curDur
return wait, false, nil
}

func (c *Cleaner) deleteFiles() {
c.log.Debugf("cleaner: delete files: %v", c.removeFiles)
c.mx.Lock()
defer c.mx.Unlock()
for _, fp := range c.removeFiles {
c.log.Debugf("cleaner: delete file: %v", fp)
err := os.Remove(fp)
if err != nil {
c.log.Warnf("cleaner: delete file %v failed: %v", fp, err)
}
}
}
Loading

0 comments on commit d77a91e

Please sign in to comment.