Skip to content

Commit

Permalink
Cherry-pick #21745 to 7.10: Agent atomic installer (#21778)
Browse files Browse the repository at this point in the history
* [Ingest Manager] Syncing unpacked files (#21706)

[Ingest Manager] Syncing unpacked files (#21706)

* [Ingest Manager] Change Sync/Close call order (#21735)

[Ingest Manager] Change Sync/Close call order (#21735)

* [Ingest Manager] Agent atomic installer (#21745)

[Ingest Manager] Agent atomic installer (#21745)
  • Loading branch information
michalpristas authored Oct 14, 2020
1 parent eebe220 commit 2890b7e
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 3 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- Copy Action store on upgrade {pull}21298[21298]
- Include inputs in action store actions {pull}21298[21298]
- Fix issue where inputs without processors defined would panic {pull}21628[21628]
- Partial extracted beat result in failure to spawn beat {issue}21718[21718]

==== New features

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package atomic

import (
"context"
"io/ioutil"
"os"
"path/filepath"
)

type embeddedInstaller interface {
Install(ctx context.Context, programName, version, installDir string) error
}

// Installer installs into temporary destination and moves to correct one after
// successful finish.
type Installer struct {
installer embeddedInstaller
}

// NewInstaller creates a new AtomicInstaller
func NewInstaller(i embeddedInstaller) (*Installer, error) {
return &Installer{
installer: i,
}, nil
}

// Install performs installation of program in a specific version.
func (i *Installer) Install(ctx context.Context, programName, version, installDir string) error {
// tar installer uses Dir of installDir to determine location of unpack
tempDir, err := ioutil.TempDir(os.TempDir(), "elastic-agent-install")
if err != nil {
return err
}
tempInstallDir := filepath.Join(tempDir, filepath.Base(installDir))

// cleanup install directory before Install
if _, err := os.Stat(installDir); err == nil || os.IsExist(err) {
os.RemoveAll(installDir)
}

if _, err := os.Stat(tempInstallDir); err == nil || os.IsExist(err) {
os.RemoveAll(tempInstallDir)
}

if err := i.installer.Install(ctx, programName, version, tempInstallDir); err != nil {
// cleanup unfinished install
os.RemoveAll(tempInstallDir)
return err
}

if err := os.Rename(tempInstallDir, installDir); err != nil {
os.RemoveAll(installDir)
os.RemoveAll(tempInstallDir)
return err
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package atomic

import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestOKInstall(t *testing.T) {
sig := make(chan int)
ti := &testInstaller{sig}
var wg sync.WaitGroup
i, err := NewInstaller(ti)

assert.NoError(t, err)

ctx := context.Background()
installDir := filepath.Join(os.TempDir(), "install_dir")

wg.Add(1)
go func() {
err := i.Install(ctx, "a", "b", installDir)
assert.NoError(t, err)
wg.Done()
}()

// signal to process next files
close(sig)

wg.Wait()

assert.DirExists(t, installDir)
files := getFiles()

for name := range files {
path := filepath.Join(installDir, name)
assert.FileExists(t, path)
}

os.RemoveAll(installDir)
}

func TestContextCancelledInstall(t *testing.T) {
sig := make(chan int)
ti := &testInstaller{sig}
var wg sync.WaitGroup
i, err := NewInstaller(ti)

assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
installDir := filepath.Join(os.TempDir(), "install_dir")

wg.Add(1)
go func() {
err := i.Install(ctx, "a", "b", installDir)
assert.Error(t, err)
wg.Done()
}()

// cancel before signaling
cancel()
close(sig)

wg.Wait()

assert.NoDirExists(t, installDir)
}

type testInstaller struct {
signal chan int
}

func (ti *testInstaller) Install(ctx context.Context, programName, version, installDir string) error {
files := getFiles()
if err := os.MkdirAll(installDir, 0777); err != nil {
return err
}

for name, content := range files {
if err := ctx.Err(); err != nil {
return err
}

filename := filepath.Join(installDir, name)
if err := ioutil.WriteFile(filename, content, 0666); err != nil {
return err
}

// wait for all but last
<-ti.signal
}

return nil
}

func getFiles() map[string][]byte {
files := make(map[string][]byte)
fileCount := 3
for i := 1; i <= fileCount; i++ {
files[fmt.Sprintf("file_%d", i)] = []byte(fmt.Sprintf("content of file %d", i))
}

return files
}
8 changes: 7 additions & 1 deletion x-pack/elastic-agent/pkg/artifact/install/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/dir"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/atomic"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/hooks"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/tar"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/zip"
Expand Down Expand Up @@ -60,5 +61,10 @@ func NewInstaller(config *artifact.Config) (InstallerChecker, error) {
return nil, err
}

return hooks.NewInstallerChecker(installer, dir.NewChecker())
atomicInstaller, err := atomic.NewInstaller(installer)
if err != nil {
return nil, err
}

return hooks.NewInstallerChecker(atomicInstaller, dir.NewChecker())
}
10 changes: 10 additions & 0 deletions x-pack/elastic-agent/pkg/artifact/install/tar/tar_installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,19 @@ func unpack(r io.Reader, dir string) error {
}

_, err = io.Copy(wf, tr)

if err == nil {
// sometimes we try executing binary too fast and run into text file busy after unpacking
// syncing prevents this
if syncErr := wf.Sync(); syncErr != nil {
err = syncErr
}
}

if closeErr := wf.Close(); closeErr != nil && err == nil {
err = closeErr
}

if err != nil {
return fmt.Errorf("TarInstaller: error writing to %s: %v", abs, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,18 @@ func (i *Installer) unzip(artifactPath string) error {
return err
}
defer func() {
if cerr := f.Close(); cerr != nil {
err = multierror.Append(err, cerr)
if closeErr := f.Close(); closeErr != nil {
err = multierror.Append(err, closeErr)
}
}()

if _, err = io.Copy(f, rc); err != nil {
return err
}

// sometimes we try executing binary too fast and run into text file busy after unpacking
// syncing prevents this
f.Sync()
}
return nil
}
Expand Down

0 comments on commit 2890b7e

Please sign in to comment.