Skip to content

Commit

Permalink
[Ingest Manager] Make installer atomic on windows (elastic#24253)
Browse files Browse the repository at this point in the history
[Ingest Manager] Make installer atomic on windows (elastic#24253)
  • Loading branch information
michalpristas committed Mar 2, 2021
1 parent 9ddc25c commit 5ebb425
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 11 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
- Windows agent doesn't uninstall with a lowercase `c:` drive in the path {pull}23998[23998]
- Fix reloading of log level for services {pull}[24055]24055
- Fix: Successfully installed and enrolled agent running standalone{pull}[24128]24128
- Make installer atomic on windows {pull}[24253]24253

==== New features

Expand Down
11 changes: 11 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ const (
isMonitoringLogsFlag = 1 << 1
)

type waiter interface {
Wait()
}

// Operator runs Start/Stop/Update operations
// it is responsible for detecting reconnect to existing processes
// based on backed up configuration
Expand Down Expand Up @@ -182,6 +186,13 @@ func (o *Operator) HandleConfig(cfg configrequest.Request) error {

// Shutdown handles shutting down the running apps for Agent shutdown.
func (o *Operator) Shutdown() {
// wait for installer and downloader
if awaitable, ok := o.installer.(waiter); ok {
o.logger.Infof("waiting for installer of pipeline '%s' to finish", o.pipelineID)
awaitable.Wait()
o.logger.Debugf("pipeline installer '%s' done", o.pipelineID)
}

for _, app := range o.apps {
app.Shutdown()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"io/ioutil"
"os"
"path/filepath"
"runtime"

"github.com/hashicorp/go-multierror"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
Expand Down Expand Up @@ -51,15 +54,28 @@ func (i *Installer) Install(ctx context.Context, spec program.Spec, version, ins

if err := i.installer.Install(ctx, spec, version, tempInstallDir); err != nil {
// cleanup unfinished install
os.RemoveAll(tempInstallDir)
if rerr := os.RemoveAll(tempInstallDir); rerr != nil {
err = multierror.Append(err, rerr)
}
return err
}

if err := os.Rename(tempInstallDir, installDir); err != nil {
os.RemoveAll(installDir)
os.RemoveAll(tempInstallDir)
if rerr := os.RemoveAll(installDir); rerr != nil {
err = multierror.Append(err, rerr)
}
if rerr := os.RemoveAll(tempInstallDir); rerr != nil {
err = multierror.Append(err, rerr)
}
return err
}

// on windows rename is not atomic, let's force it to flush the cache
if runtime.GOOS == "windows" {
if f, err := os.OpenFile(installDir, os.O_SYNC|os.O_RDWR, 0755); err == nil {
f.Sync()
}
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package awaitable

import (
"context"
"sync"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
)

type embeddedInstaller interface {
Install(ctx context.Context, spec program.Spec, version, installDir string) error
}

type embeddedChecker interface {
Check(ctx context.Context, spec program.Spec, version, installDir string) error
}

// Installer installs into temporary destination and moves to correct one after
// successful finish.
type Installer struct {
installer embeddedInstaller
checker embeddedChecker
wg sync.WaitGroup
}

// NewInstaller creates a new AtomicInstaller
func NewInstaller(i embeddedInstaller, ch embeddedChecker) (*Installer, error) {
return &Installer{
installer: i,
checker: ch,
}, nil
}

// Wait allows caller to wait for install to be finished
func (i *Installer) Wait() {
i.wg.Wait()
}

// Install performs installation of program in a specific version.
func (i *Installer) Install(ctx context.Context, spec program.Spec, version, installDir string) error {
i.wg.Add(1)
defer i.wg.Done()

return i.installer.Install(ctx, spec, version, installDir)
}

// Check performs installation checks
func (i *Installer) Check(ctx context.Context, spec program.Spec, version, installDir string) error {
i.wg.Add(1)
defer i.wg.Done()

return i.checker.Check(ctx, spec, version, installDir)
}
18 changes: 16 additions & 2 deletions x-pack/elastic-agent/pkg/artifact/install/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/atomic"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/awaitable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/dir"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/hooks"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/tar"
Expand Down Expand Up @@ -39,12 +40,20 @@ type InstallerChecker interface {
Check(ctx context.Context, spec program.Spec, version, installDir string) error
}

// AwaitableInstallerChecker is an interface that installs, checks but also is awaitable to check when actions are done.
type AwaitableInstallerChecker interface {
InstallerChecker

// Waits for its work to be done.
Wait()
}

// NewInstaller returns a correct installer associated with a
// package type:
// - rpm -> rpm installer
// - deb -> deb installer
// - binary -> zip installer on windows, tar installer on linux and mac
func NewInstaller(config *artifact.Config) (InstallerChecker, error) {
func NewInstaller(config *artifact.Config) (AwaitableInstallerChecker, error) {
if config == nil {
return nil, ErrConfigNotProvided
}
Expand All @@ -66,5 +75,10 @@ func NewInstaller(config *artifact.Config) (InstallerChecker, error) {
return nil, err
}

return hooks.NewInstallerChecker(atomicInstaller, dir.NewChecker())
hooksInstaller, err := hooks.NewInstallerChecker(atomicInstaller, dir.NewChecker())
if err != nil {
return nil, err
}

return awaitable.NewInstaller(hooksInstaller, hooksInstaller)
}
11 changes: 8 additions & 3 deletions x-pack/elastic-agent/pkg/artifact/install/tar/tar_installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewInstaller(config *artifact.Config) (*Installer, error) {

// Install performs installation of program in a specific version.
// It expects package to be already downloaded.
func (i *Installer) Install(_ context.Context, spec program.Spec, version, installDir string) error {
func (i *Installer) Install(ctx context.Context, spec program.Spec, version, installDir string) error {
artifactPath, err := artifact.GetArtifactPath(spec, version, i.config.OS(), i.config.Arch(), i.config.TargetDirectory)
if err != nil {
return err
Expand All @@ -53,10 +53,10 @@ func (i *Installer) Install(_ context.Context, spec program.Spec, version, insta

// unpack must occur in directory that holds the installation directory
// or the extraction will be double nested
return unpack(f, filepath.Dir(installDir))
return unpack(ctx, f, filepath.Dir(installDir))
}

func unpack(r io.Reader, dir string) error {
func unpack(ctx context.Context, r io.Reader, dir string) error {
zr, err := gzip.NewReader(r)
if err != nil {
return errors.New("requires gzip-compressed body", err, errors.TypeFilesystem)
Expand All @@ -66,6 +66,11 @@ func unpack(r io.Reader, dir string) error {
var rootDir string

for {
// exit and propagate cancellation err as soon as we know about it
if err := ctx.Err(); err != nil {
return err
}

f, err := tr.Next()
if err == io.EOF {
break
Expand Down
11 changes: 8 additions & 3 deletions x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewInstaller(config *artifact.Config) (*Installer, error) {

// Install performs installation of program in a specific version.
// It expects package to be already downloaded.
func (i *Installer) Install(_ context.Context, spec program.Spec, version, installDir string) error {
func (i *Installer) Install(ctx context.Context, spec program.Spec, version, installDir string) error {
artifactPath, err := artifact.GetArtifactPath(spec, version, i.config.OS(), i.config.Arch(), i.config.TargetDirectory)
if err != nil {
return err
Expand All @@ -49,7 +49,7 @@ func (i *Installer) Install(_ context.Context, spec program.Spec, version, insta
os.RemoveAll(installDir)
}

if err := i.unzip(artifactPath); err != nil {
if err := i.unzip(ctx, artifactPath); err != nil {
return err
}

Expand All @@ -69,7 +69,7 @@ func (i *Installer) Install(_ context.Context, spec program.Spec, version, insta
return nil
}

func (i *Installer) unzip(artifactPath string) error {
func (i *Installer) unzip(ctx context.Context, artifactPath string) error {
r, err := zip.OpenReader(artifactPath)
if err != nil {
return err
Expand Down Expand Up @@ -120,6 +120,11 @@ func (i *Installer) unzip(artifactPath string) error {
}

for _, f := range r.File {
// if we were cancelled in between
if err := ctx.Err(); err != nil {
return err
}

if err := unpackFile(f); err != nil {
return err
}
Expand Down

0 comments on commit 5ebb425

Please sign in to comment.