diff --git a/x-pack/elastic-agent/CHANGELOG.asciidoc b/x-pack/elastic-agent/CHANGELOG.asciidoc index d0edf3666d5..4ceaff77f46 100644 --- a/x-pack/elastic-agent/CHANGELOG.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.asciidoc @@ -36,6 +36,7 @@ - Windows agent doesn't uninstall with a lowercase `c:` drive in the path {pull}23998[23998] - Fix reloading of log level for services {pull}[24055]24055 - Fix: Successfully installed and enrolled agent running standalone{pull}[24128]24128 +- Make installer atomic on windows {pull}[24253]24253 ==== New features diff --git a/x-pack/elastic-agent/pkg/agent/operation/operator.go b/x-pack/elastic-agent/pkg/agent/operation/operator.go index a95bfe5b165..a0416545b11 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operator.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operator.go @@ -36,6 +36,10 @@ const ( isMonitoringLogsFlag = 1 << 1 ) +type waiter interface { + Wait() +} + // Operator runs Start/Stop/Update operations // it is responsible for detecting reconnect to existing processes // based on backed up configuration @@ -182,6 +186,13 @@ func (o *Operator) HandleConfig(cfg configrequest.Request) error { // Shutdown handles shutting down the running apps for Agent shutdown. func (o *Operator) Shutdown() { + // wait for installer and downloader + if awaitable, ok := o.installer.(waiter); ok { + o.logger.Infof("waiting for installer of pipeline '%s' to finish", o.pipelineID) + awaitable.Wait() + o.logger.Debugf("pipeline installer '%s' done", o.pipelineID) + } + for _, app := range o.apps { app.Shutdown() } diff --git a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go index f6c139ca463..980bb2127f9 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go @@ -9,6 +9,9 @@ import ( "io/ioutil" "os" "path/filepath" + "runtime" + + "github.com/hashicorp/go-multierror" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" @@ -51,15 +54,28 @@ func (i *Installer) Install(ctx context.Context, spec program.Spec, version, ins if err := i.installer.Install(ctx, spec, version, tempInstallDir); err != nil { // cleanup unfinished install - os.RemoveAll(tempInstallDir) + if rerr := os.RemoveAll(tempInstallDir); rerr != nil { + err = multierror.Append(err, rerr) + } return err } if err := os.Rename(tempInstallDir, installDir); err != nil { - os.RemoveAll(installDir) - os.RemoveAll(tempInstallDir) + if rerr := os.RemoveAll(installDir); rerr != nil { + err = multierror.Append(err, rerr) + } + if rerr := os.RemoveAll(tempInstallDir); rerr != nil { + err = multierror.Append(err, rerr) + } return err } + // on windows rename is not atomic, let's force it to flush the cache + if runtime.GOOS == "windows" { + if f, err := os.OpenFile(installDir, os.O_SYNC|os.O_RDWR, 0755); err == nil { + f.Sync() + } + } + return nil } diff --git a/x-pack/elastic-agent/pkg/artifact/install/awaitable/awaitable_installer.go b/x-pack/elastic-agent/pkg/artifact/install/awaitable/awaitable_installer.go new file mode 100644 index 00000000000..3525def1c7f --- /dev/null +++ b/x-pack/elastic-agent/pkg/artifact/install/awaitable/awaitable_installer.go @@ -0,0 +1,57 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awaitable + +import ( + "context" + "sync" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" +) + +type embeddedInstaller interface { + Install(ctx context.Context, spec program.Spec, version, installDir string) error +} + +type embeddedChecker interface { + Check(ctx context.Context, spec program.Spec, version, installDir string) error +} + +// Installer installs into temporary destination and moves to correct one after +// successful finish. +type Installer struct { + installer embeddedInstaller + checker embeddedChecker + wg sync.WaitGroup +} + +// NewInstaller creates a new AtomicInstaller +func NewInstaller(i embeddedInstaller, ch embeddedChecker) (*Installer, error) { + return &Installer{ + installer: i, + checker: ch, + }, nil +} + +// Wait allows caller to wait for install to be finished +func (i *Installer) Wait() { + i.wg.Wait() +} + +// Install performs installation of program in a specific version. +func (i *Installer) Install(ctx context.Context, spec program.Spec, version, installDir string) error { + i.wg.Add(1) + defer i.wg.Done() + + return i.installer.Install(ctx, spec, version, installDir) +} + +// Check performs installation checks +func (i *Installer) Check(ctx context.Context, spec program.Spec, version, installDir string) error { + i.wg.Add(1) + defer i.wg.Done() + + return i.checker.Check(ctx, spec, version, installDir) +} diff --git a/x-pack/elastic-agent/pkg/artifact/install/installer.go b/x-pack/elastic-agent/pkg/artifact/install/installer.go index b99563ff997..a53cc46633b 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/installer.go @@ -12,6 +12,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/atomic" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/awaitable" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/dir" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/hooks" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/tar" @@ -39,12 +40,20 @@ type InstallerChecker interface { Check(ctx context.Context, spec program.Spec, version, installDir string) error } +// AwaitableInstallerChecker is an interface that installs, checks but also is awaitable to check when actions are done. +type AwaitableInstallerChecker interface { + InstallerChecker + + // Waits for its work to be done. + Wait() +} + // NewInstaller returns a correct installer associated with a // package type: // - rpm -> rpm installer // - deb -> deb installer // - binary -> zip installer on windows, tar installer on linux and mac -func NewInstaller(config *artifact.Config) (InstallerChecker, error) { +func NewInstaller(config *artifact.Config) (AwaitableInstallerChecker, error) { if config == nil { return nil, ErrConfigNotProvided } @@ -66,5 +75,10 @@ func NewInstaller(config *artifact.Config) (InstallerChecker, error) { return nil, err } - return hooks.NewInstallerChecker(atomicInstaller, dir.NewChecker()) + hooksInstaller, err := hooks.NewInstallerChecker(atomicInstaller, dir.NewChecker()) + if err != nil { + return nil, err + } + + return awaitable.NewInstaller(hooksInstaller, hooksInstaller) } diff --git a/x-pack/elastic-agent/pkg/artifact/install/tar/tar_installer.go b/x-pack/elastic-agent/pkg/artifact/install/tar/tar_installer.go index 2452f5909cc..421a30bed4f 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/tar/tar_installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/tar/tar_installer.go @@ -33,7 +33,7 @@ func NewInstaller(config *artifact.Config) (*Installer, error) { // Install performs installation of program in a specific version. // It expects package to be already downloaded. -func (i *Installer) Install(_ context.Context, spec program.Spec, version, installDir string) error { +func (i *Installer) Install(ctx context.Context, spec program.Spec, version, installDir string) error { artifactPath, err := artifact.GetArtifactPath(spec, version, i.config.OS(), i.config.Arch(), i.config.TargetDirectory) if err != nil { return err @@ -53,10 +53,10 @@ func (i *Installer) Install(_ context.Context, spec program.Spec, version, insta // unpack must occur in directory that holds the installation directory // or the extraction will be double nested - return unpack(f, filepath.Dir(installDir)) + return unpack(ctx, f, filepath.Dir(installDir)) } -func unpack(r io.Reader, dir string) error { +func unpack(ctx context.Context, r io.Reader, dir string) error { zr, err := gzip.NewReader(r) if err != nil { return errors.New("requires gzip-compressed body", err, errors.TypeFilesystem) @@ -66,6 +66,11 @@ func unpack(r io.Reader, dir string) error { var rootDir string for { + // exit and propagate cancellation err as soon as we know about it + if err := ctx.Err(); err != nil { + return err + } + f, err := tr.Next() if err == io.EOF { break diff --git a/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go b/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go index fdd374eb72a..3073da25ca4 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go @@ -37,7 +37,7 @@ func NewInstaller(config *artifact.Config) (*Installer, error) { // Install performs installation of program in a specific version. // It expects package to be already downloaded. -func (i *Installer) Install(_ context.Context, spec program.Spec, version, installDir string) error { +func (i *Installer) Install(ctx context.Context, spec program.Spec, version, installDir string) error { artifactPath, err := artifact.GetArtifactPath(spec, version, i.config.OS(), i.config.Arch(), i.config.TargetDirectory) if err != nil { return err @@ -49,7 +49,7 @@ func (i *Installer) Install(_ context.Context, spec program.Spec, version, insta os.RemoveAll(installDir) } - if err := i.unzip(artifactPath); err != nil { + if err := i.unzip(ctx, artifactPath); err != nil { return err } @@ -69,7 +69,7 @@ func (i *Installer) Install(_ context.Context, spec program.Spec, version, insta return nil } -func (i *Installer) unzip(artifactPath string) error { +func (i *Installer) unzip(ctx context.Context, artifactPath string) error { r, err := zip.OpenReader(artifactPath) if err != nil { return err @@ -120,6 +120,11 @@ func (i *Installer) unzip(artifactPath string) error { } for _, f := range r.File { + // if we were cancelled in between + if err := ctx.Err(); err != nil { + return err + } + if err := unpackFile(f); err != nil { return err }