Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure allocDir is never nil and persisted safely #2838

Merged
merged 5 commits into from
Jul 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 53 additions & 43 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ type AllocRunner struct {
updater AllocStateUpdater
logger *log.Logger

// allocID is the ID of this runner's allocation. Since it does not
// change for the lifetime of the AllocRunner it is safe to read
// without acquiring a lock (unlike alloc).
allocID string

alloc *structs.Allocation
allocClientStatus string // Explicit status of allocation. Set when there are failures
allocClientDescription string
Expand Down Expand Up @@ -151,8 +156,10 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB,
updater: updater,
logger: logger,
alloc: alloc,
allocID: alloc.ID,
allocBroadcast: cstructs.NewAllocBroadcaster(0),
dirtyCh: make(chan struct{}, 1),
allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)),
tasks: make(map[string]*TaskRunner),
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
Expand All @@ -173,7 +180,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB,
func (r *AllocRunner) pre060StateFilePath() string {
r.allocLock.Lock()
defer r.allocLock.Unlock()
path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json")
path := filepath.Join(r.config.StateDir, "alloc", r.allocID, "state.json")
return path
}

Expand All @@ -187,7 +194,7 @@ func (r *AllocRunner) RestoreState() error {
var upgrading bool
if err := pre060RestoreState(oldPath, &snap); err == nil {
// Restore fields
r.logger.Printf("[INFO] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.alloc.ID)
r.logger.Printf("[INFO] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.allocID)
r.alloc = snap.Alloc
r.allocDir = snap.AllocDir
r.allocClientStatus = snap.AllocClientStatus
Expand All @@ -201,7 +208,7 @@ func (r *AllocRunner) RestoreState() error {
// #2132 Upgrade path: if snap.AllocDir is nil, try to convert old
// Context struct to new AllocDir struct
if snap.AllocDir == nil && snap.Context != nil {
r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID)
r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.allocID)
r.allocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir)
for taskName := range snap.Context.AllocDir.TaskDirs {
r.allocDir.NewTaskDir(taskName)
Expand All @@ -217,7 +224,7 @@ func (r *AllocRunner) RestoreState() error {
} else {
// We are doing a normal restore
err := r.stateDB.View(func(tx *bolt.Tx) error {
bkt, err := getAllocationBucket(tx, r.alloc.ID)
bkt, err := getAllocationBucket(tx, r.allocID)
if err != nil {
return fmt.Errorf("failed to get allocation bucket: %v", err)
}
Expand Down Expand Up @@ -283,10 +290,11 @@ func (r *AllocRunner) RestoreState() error {

td, ok := r.allocDir.TaskDirs[name]
if !ok {
err := fmt.Errorf("failed to find task dir metadata for alloc %q task %q",
r.alloc.ID, name)
r.logger.Printf("[ERR] client: %v", err)
return err
// Create the task dir metadata if it doesn't exist.
// Since task dirs are created during r.Run() the
// client may save state and exit before all task dirs
// are created
td = r.allocDir.NewTaskDir(name)
}

tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
Expand All @@ -298,21 +306,21 @@ func (r *AllocRunner) RestoreState() error {
}

if restartReason, err := tr.RestoreState(); err != nil {
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task %q: %v", r.alloc.ID, name, err)
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task %q: %v", r.allocID, name, err)
mErr.Errors = append(mErr.Errors, err)
} else if !r.alloc.TerminalStatus() {
// Only start if the alloc isn't in a terminal status.
go tr.Run()

if upgrading {
if err := tr.SaveState(); err != nil {
r.logger.Printf("[WARN] client: initial save state for alloc %s task %s failed: %v", r.alloc.ID, name, err)
r.logger.Printf("[WARN] client: initial save state for alloc %s task %s failed: %v", r.allocID, name, err)
}
}

// Restart task runner if RestoreState gave a reason
if restartReason != "" {
r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.alloc.ID, name, restartReason)
r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.allocID, name, restartReason)
tr.Restart("upgrade", restartReason)
}
}
Expand Down Expand Up @@ -351,14 +359,14 @@ func (r *AllocRunner) saveAllocRunnerState() error {
r.allocLock.Unlock()

r.allocDirLock.Lock()
allocDir := r.allocDir
allocDir := r.allocDir.Copy()
r.allocDirLock.Unlock()

// Start the transaction.
return r.stateDB.Batch(func(tx *bolt.Tx) error {

// Grab the allocation bucket
allocBkt, err := getAllocationBucket(tx, r.alloc.ID)
allocBkt, err := getAllocationBucket(tx, r.allocID)
if err != nil {
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
}
Expand Down Expand Up @@ -399,7 +407,7 @@ func (r *AllocRunner) saveAllocRunnerState() error {
}

// Write the alloc dir data if it hasn't been written before and it exists.
if !r.allocDirPersisted && r.allocDir != nil {
if !r.allocDirPersisted && allocDir != nil {
if err := putObject(allocBkt, allocRunnerStateAllocDirKey, allocDir); err != nil {
return fmt.Errorf("failed to write alloc_runner allocDir state: %v", err)
}
Expand Down Expand Up @@ -427,15 +435,15 @@ func (r *AllocRunner) saveAllocRunnerState() error {
func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {
if err := tr.SaveState(); err != nil {
return fmt.Errorf("failed to save state for alloc %s task '%s': %v",
r.alloc.ID, tr.task.Name, err)
r.allocID, tr.task.Name, err)
}
return nil
}

// DestroyState is used to cleanup after ourselves
func (r *AllocRunner) DestroyState() error {
return r.stateDB.Update(func(tx *bolt.Tx) error {
if err := deleteAllocationBucket(tx, r.alloc.ID); err != nil {
if err := deleteAllocationBucket(tx, r.allocID); err != nil {
return fmt.Errorf("failed to delete allocation bucket: %v", err)
}
return nil
Expand Down Expand Up @@ -549,7 +557,11 @@ func (r *AllocRunner) dirtySyncState() {
for {
select {
case <-r.dirtyCh:
r.syncStatus()
if err := r.syncStatus(); err != nil {
// Only WARN instead of ERR because we continue on
r.logger.Printf("[WARN] client: error persisting alloc %q state: %v",
r.allocID, err)
}
case <-r.ctx.Done():
return
}
Expand Down Expand Up @@ -685,30 +697,27 @@ func (r *AllocRunner) Run() {
alloc := r.Alloc()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
r.logger.Printf("[ERR] client: alloc %q for missing task group %q", r.allocID, alloc.TaskGroup)
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("missing task group '%s'", alloc.TaskGroup))
return
}

// Create the execution context
r.allocDirLock.Lock()
if r.allocDir == nil {
// Build allocation directory
r.allocDir = allocdir.NewAllocDir(r.logger, filepath.Join(r.config.AllocDir, r.alloc.ID))
if err := r.allocDir.Build(); err != nil {
r.logger.Printf("[WARN] client: failed to build task directories: %v", err)
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup))
r.allocDirLock.Unlock()
return
}
// Build allocation directory (idempotent)
if err := r.allocDir.Build(); err != nil {
r.logger.Printf("[ERR] client: failed to build task directories: %v", err)
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup))
r.allocDirLock.Unlock()
return
}

if r.otherAllocDir != nil {
if err := r.allocDir.Move(r.otherAllocDir, tg.Tasks); err != nil {
r.logger.Printf("[ERROR] client: failed to move alloc dir into alloc %q: %v", r.alloc.ID, err)
}
if err := r.otherAllocDir.Destroy(); err != nil {
r.logger.Printf("[ERROR] client: error destroying allocdir %v: %v", r.otherAllocDir.AllocDir, err)
}
if r.otherAllocDir != nil {
if err := r.allocDir.Move(r.otherAllocDir, tg.Tasks); err != nil {
r.logger.Printf("[ERR] client: failed to move alloc dir into alloc %q: %v", r.allocID, err)
}
if err := r.otherAllocDir.Destroy(); err != nil {
r.logger.Printf("[ERR] client: error destroying allocdir %v: %v", r.otherAllocDir.AllocDir, err)
}
}
r.allocDirLock.Unlock()
Expand All @@ -717,9 +726,9 @@ func (r *AllocRunner) Run() {
// start any of the task runners and directly wait for the destroy signal to
// clean up the allocation.
if alloc.TerminalStatus() {
r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.alloc.ID)
r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.allocID)
r.handleDestroy()
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID)
return
}

Expand All @@ -728,7 +737,7 @@ func (r *AllocRunner) Run() {
go r.watchHealth(wCtx)

// Start the task runners
r.logger.Printf("[DEBUG] client: starting task runners for alloc '%s'", r.alloc.ID)
r.logger.Printf("[DEBUG] client: starting task runners for alloc '%s'", r.allocID)
r.taskLock.Lock()
for _, task := range tg.Tasks {
if _, ok := r.restored[task.Name]; ok {
Expand Down Expand Up @@ -785,7 +794,8 @@ OUTER:
}

if err := r.syncStatus(); err != nil {
r.logger.Printf("[WARN] client: failed to sync status upon receiving alloc update: %v", err)
r.logger.Printf("[WARN] client: failed to sync alloc %q status upon receiving alloc update: %v",
r.allocID, err)
}
case <-r.ctx.Done():
taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled)
Expand All @@ -802,7 +812,7 @@ OUTER:
// Free up the context. It has likely exited already
watcherCancel()

r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID)
}

// SetPreviousAllocDir sets the previous allocation directory of the current
Expand Down Expand Up @@ -850,16 +860,16 @@ func (r *AllocRunner) handleDestroy() {
case <-r.ctx.Done():
if err := r.DestroyContext(); err != nil {
r.logger.Printf("[ERR] client: failed to destroy context for alloc '%s': %v",
r.alloc.ID, err)
r.allocID, err)
}
if err := r.DestroyState(); err != nil {
r.logger.Printf("[ERR] client: failed to destroy state for alloc '%s': %v",
r.alloc.ID, err)
r.allocID, err)
}

return
case <-r.updateCh:
r.logger.Printf("[DEBUG] client: dropping update to terminal alloc '%s'", r.alloc.ID)
r.logger.Printf("[DEBUG] client: dropping update to terminal alloc '%s'", r.allocID)
}
}
}
Expand Down Expand Up @@ -905,7 +915,7 @@ func (r *AllocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResour
tr, ok := r.tasks[taskFilter]
r.taskLock.RUnlock()
if !ok {
return nil, fmt.Errorf("allocation %q has no task %q", r.alloc.ID, taskFilter)
return nil, fmt.Errorf("allocation %q has no task %q", r.allocID, taskFilter)
}
l := tr.LatestResourceUsage()
if l != nil {
Expand Down
18 changes: 18 additions & 0 deletions client/allocdir/alloc_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,24 @@ func NewAllocDir(logger *log.Logger, allocDir string) *AllocDir {
}
}

// Copy an AllocDir and all of its TaskDirs. Returns nil if AllocDir is
// nil.
func (d *AllocDir) Copy() *AllocDir {
if d == nil {
return nil
}
dcopy := &AllocDir{
AllocDir: d.AllocDir,
SharedDir: d.SharedDir,
TaskDirs: make(map[string]*TaskDir, len(d.TaskDirs)),
logger: d.logger,
}
for k, v := range d.TaskDirs {
dcopy.TaskDirs[k] = v.Copy()
}
return dcopy
}

// NewTaskDir creates a new TaskDir and adds it to the AllocDirs TaskDirs map.
func (d *AllocDir) NewTaskDir(name string) *TaskDir {
td := newTaskDir(d.logger, d.AllocDir, name)
Expand Down
25 changes: 24 additions & 1 deletion client/allocdir/alloc_dir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/kr/pretty"
)

var (
Expand Down Expand Up @@ -50,7 +51,10 @@ var (
)

func testLogger() *log.Logger {
return log.New(os.Stderr, "", log.LstdFlags)
if testing.Verbose() {
return log.New(os.Stderr, "", log.LstdFlags)
}
return log.New(ioutil.Discard, "", log.LstdFlags)
}

// Test that AllocDir.Build builds just the alloc directory.
Expand Down Expand Up @@ -409,6 +413,25 @@ func TestAllocDir_CreateDir(t *testing.T) {
}
}

// TestAllocDir_Copy asserts that AllocDir.Copy does a deep copy of itself and
// all TaskDirs.
func TestAllocDir_Copy(t *testing.T) {
a := NewAllocDir(testLogger(), "foo")
a.NewTaskDir("bar")
a.NewTaskDir("baz")

b := a.Copy()
if diff := pretty.Diff(a, b); len(diff) > 0 {
t.Errorf("differences between copies: %# v", pretty.Formatter(diff))
}

// Make sure TaskDirs map is copied
a.NewTaskDir("new")
if b.TaskDirs["new"] != nil {
t.Errorf("TaskDirs map shared between copied")
}
}

func TestPathFuncs(t *testing.T) {
dir, err := ioutil.TempDir("", "nomadtest-pathfuncs")
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions client/allocdir/task_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ func newTaskDir(logger *log.Logger, allocDir, taskName string) *TaskDir {
}
}

// Copy a TaskDir. Panics if TaskDir is nil as TaskDirs should never be nil.
func (t *TaskDir) Copy() *TaskDir {
// No nested structures other than the logger which is safe to share,
// so just copy the struct
tcopy := *t
return &tcopy
}

// Build default directories and permissions in a task directory. chrootCreated
// allows skipping chroot creation if the caller knows it has already been
// done.
Expand Down