diff --git a/client/alloc_runner.go b/client/alloc_runner.go index ebd21e95ec6..80898f71581 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -51,6 +51,11 @@ type AllocRunner struct { updater AllocStateUpdater logger *log.Logger + // allocID is the ID of this runner's allocation. Since it does not + // change for the lifetime of the AllocRunner it is safe to read + // without acquiring a lock (unlike alloc). + allocID string + alloc *structs.Allocation allocClientStatus string // Explicit status of allocation. Set when there are failures allocClientDescription string @@ -151,8 +156,10 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater: updater, logger: logger, alloc: alloc, + allocID: alloc.ID, allocBroadcast: cstructs.NewAllocBroadcaster(0), dirtyCh: make(chan struct{}, 1), + allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)), tasks: make(map[string]*TaskRunner), taskStates: copyTaskStates(alloc.TaskStates), restored: make(map[string]struct{}), @@ -173,7 +180,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, func (r *AllocRunner) pre060StateFilePath() string { r.allocLock.Lock() defer r.allocLock.Unlock() - path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json") + path := filepath.Join(r.config.StateDir, "alloc", r.allocID, "state.json") return path } @@ -187,7 +194,7 @@ func (r *AllocRunner) RestoreState() error { var upgrading bool if err := pre060RestoreState(oldPath, &snap); err == nil { // Restore fields - r.logger.Printf("[INFO] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.alloc.ID) + r.logger.Printf("[INFO] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.allocID) r.alloc = snap.Alloc r.allocDir = snap.AllocDir r.allocClientStatus = snap.AllocClientStatus @@ -201,7 +208,7 @@ func (r *AllocRunner) RestoreState() error { // #2132 Upgrade path: if snap.AllocDir is nil, try to convert old // Context struct to new AllocDir struct if snap.AllocDir == nil && snap.Context != nil { - r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID) + r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.allocID) r.allocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir) for taskName := range snap.Context.AllocDir.TaskDirs { r.allocDir.NewTaskDir(taskName) @@ -217,7 +224,7 @@ func (r *AllocRunner) RestoreState() error { } else { // We are doing a normal restore err := r.stateDB.View(func(tx *bolt.Tx) error { - bkt, err := getAllocationBucket(tx, r.alloc.ID) + bkt, err := getAllocationBucket(tx, r.allocID) if err != nil { return fmt.Errorf("failed to get allocation bucket: %v", err) } @@ -283,10 +290,11 @@ func (r *AllocRunner) RestoreState() error { td, ok := r.allocDir.TaskDirs[name] if !ok { - err := fmt.Errorf("failed to find task dir metadata for alloc %q task %q", - r.alloc.ID, name) - r.logger.Printf("[ERR] client: %v", err) - return err + // Create the task dir metadata if it doesn't exist. + // Since task dirs are created during r.Run() the + // client may save state and exit before all task dirs + // are created + td = r.allocDir.NewTaskDir(name) } tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient) @@ -298,7 +306,7 @@ func (r *AllocRunner) RestoreState() error { } if restartReason, err := tr.RestoreState(); err != nil { - r.logger.Printf("[ERR] client: failed to restore state for alloc %s task %q: %v", r.alloc.ID, name, err) + r.logger.Printf("[ERR] client: failed to restore state for alloc %s task %q: %v", r.allocID, name, err) mErr.Errors = append(mErr.Errors, err) } else if !r.alloc.TerminalStatus() { // Only start if the alloc isn't in a terminal status. @@ -306,13 +314,13 @@ func (r *AllocRunner) RestoreState() error { if upgrading { if err := tr.SaveState(); err != nil { - r.logger.Printf("[WARN] client: initial save state for alloc %s task %s failed: %v", r.alloc.ID, name, err) + r.logger.Printf("[WARN] client: initial save state for alloc %s task %s failed: %v", r.allocID, name, err) } } // Restart task runner if RestoreState gave a reason if restartReason != "" { - r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.alloc.ID, name, restartReason) + r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.allocID, name, restartReason) tr.Restart("upgrade", restartReason) } } @@ -351,14 +359,14 @@ func (r *AllocRunner) saveAllocRunnerState() error { r.allocLock.Unlock() r.allocDirLock.Lock() - allocDir := r.allocDir + allocDir := r.allocDir.Copy() r.allocDirLock.Unlock() // Start the transaction. return r.stateDB.Batch(func(tx *bolt.Tx) error { // Grab the allocation bucket - allocBkt, err := getAllocationBucket(tx, r.alloc.ID) + allocBkt, err := getAllocationBucket(tx, r.allocID) if err != nil { return fmt.Errorf("failed to retrieve allocation bucket: %v", err) } @@ -399,7 +407,7 @@ func (r *AllocRunner) saveAllocRunnerState() error { } // Write the alloc dir data if it hasn't been written before and it exists. - if !r.allocDirPersisted && r.allocDir != nil { + if !r.allocDirPersisted && allocDir != nil { if err := putObject(allocBkt, allocRunnerStateAllocDirKey, allocDir); err != nil { return fmt.Errorf("failed to write alloc_runner allocDir state: %v", err) } @@ -427,7 +435,7 @@ func (r *AllocRunner) saveAllocRunnerState() error { func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error { if err := tr.SaveState(); err != nil { return fmt.Errorf("failed to save state for alloc %s task '%s': %v", - r.alloc.ID, tr.task.Name, err) + r.allocID, tr.task.Name, err) } return nil } @@ -435,7 +443,7 @@ func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error { // DestroyState is used to cleanup after ourselves func (r *AllocRunner) DestroyState() error { return r.stateDB.Update(func(tx *bolt.Tx) error { - if err := deleteAllocationBucket(tx, r.alloc.ID); err != nil { + if err := deleteAllocationBucket(tx, r.allocID); err != nil { return fmt.Errorf("failed to delete allocation bucket: %v", err) } return nil @@ -549,7 +557,11 @@ func (r *AllocRunner) dirtySyncState() { for { select { case <-r.dirtyCh: - r.syncStatus() + if err := r.syncStatus(); err != nil { + // Only WARN instead of ERR because we continue on + r.logger.Printf("[WARN] client: error persisting alloc %q state: %v", + r.allocID, err) + } case <-r.ctx.Done(): return } @@ -685,30 +697,27 @@ func (r *AllocRunner) Run() { alloc := r.Alloc() tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) if tg == nil { - r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup) + r.logger.Printf("[ERR] client: alloc %q for missing task group %q", r.allocID, alloc.TaskGroup) r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("missing task group '%s'", alloc.TaskGroup)) return } // Create the execution context r.allocDirLock.Lock() - if r.allocDir == nil { - // Build allocation directory - r.allocDir = allocdir.NewAllocDir(r.logger, filepath.Join(r.config.AllocDir, r.alloc.ID)) - if err := r.allocDir.Build(); err != nil { - r.logger.Printf("[WARN] client: failed to build task directories: %v", err) - r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup)) - r.allocDirLock.Unlock() - return - } + // Build allocation directory (idempotent) + if err := r.allocDir.Build(); err != nil { + r.logger.Printf("[ERR] client: failed to build task directories: %v", err) + r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup)) + r.allocDirLock.Unlock() + return + } - if r.otherAllocDir != nil { - if err := r.allocDir.Move(r.otherAllocDir, tg.Tasks); err != nil { - r.logger.Printf("[ERROR] client: failed to move alloc dir into alloc %q: %v", r.alloc.ID, err) - } - if err := r.otherAllocDir.Destroy(); err != nil { - r.logger.Printf("[ERROR] client: error destroying allocdir %v: %v", r.otherAllocDir.AllocDir, err) - } + if r.otherAllocDir != nil { + if err := r.allocDir.Move(r.otherAllocDir, tg.Tasks); err != nil { + r.logger.Printf("[ERR] client: failed to move alloc dir into alloc %q: %v", r.allocID, err) + } + if err := r.otherAllocDir.Destroy(); err != nil { + r.logger.Printf("[ERR] client: error destroying allocdir %v: %v", r.otherAllocDir.AllocDir, err) } } r.allocDirLock.Unlock() @@ -717,9 +726,9 @@ func (r *AllocRunner) Run() { // start any of the task runners and directly wait for the destroy signal to // clean up the allocation. if alloc.TerminalStatus() { - r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.alloc.ID) + r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.allocID) r.handleDestroy() - r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID) + r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID) return } @@ -728,7 +737,7 @@ func (r *AllocRunner) Run() { go r.watchHealth(wCtx) // Start the task runners - r.logger.Printf("[DEBUG] client: starting task runners for alloc '%s'", r.alloc.ID) + r.logger.Printf("[DEBUG] client: starting task runners for alloc '%s'", r.allocID) r.taskLock.Lock() for _, task := range tg.Tasks { if _, ok := r.restored[task.Name]; ok { @@ -785,7 +794,8 @@ OUTER: } if err := r.syncStatus(); err != nil { - r.logger.Printf("[WARN] client: failed to sync status upon receiving alloc update: %v", err) + r.logger.Printf("[WARN] client: failed to sync alloc %q status upon receiving alloc update: %v", + r.allocID, err) } case <-r.ctx.Done(): taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled) @@ -802,7 +812,7 @@ OUTER: // Free up the context. It has likely exited already watcherCancel() - r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID) + r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID) } // SetPreviousAllocDir sets the previous allocation directory of the current @@ -850,16 +860,16 @@ func (r *AllocRunner) handleDestroy() { case <-r.ctx.Done(): if err := r.DestroyContext(); err != nil { r.logger.Printf("[ERR] client: failed to destroy context for alloc '%s': %v", - r.alloc.ID, err) + r.allocID, err) } if err := r.DestroyState(); err != nil { r.logger.Printf("[ERR] client: failed to destroy state for alloc '%s': %v", - r.alloc.ID, err) + r.allocID, err) } return case <-r.updateCh: - r.logger.Printf("[DEBUG] client: dropping update to terminal alloc '%s'", r.alloc.ID) + r.logger.Printf("[DEBUG] client: dropping update to terminal alloc '%s'", r.allocID) } } } @@ -905,7 +915,7 @@ func (r *AllocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResour tr, ok := r.tasks[taskFilter] r.taskLock.RUnlock() if !ok { - return nil, fmt.Errorf("allocation %q has no task %q", r.alloc.ID, taskFilter) + return nil, fmt.Errorf("allocation %q has no task %q", r.allocID, taskFilter) } l := tr.LatestResourceUsage() if l != nil { diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index 1850cef1c6e..ece467c1b39 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -98,6 +98,24 @@ func NewAllocDir(logger *log.Logger, allocDir string) *AllocDir { } } +// Copy an AllocDir and all of its TaskDirs. Returns nil if AllocDir is +// nil. +func (d *AllocDir) Copy() *AllocDir { + if d == nil { + return nil + } + dcopy := &AllocDir{ + AllocDir: d.AllocDir, + SharedDir: d.SharedDir, + TaskDirs: make(map[string]*TaskDir, len(d.TaskDirs)), + logger: d.logger, + } + for k, v := range d.TaskDirs { + dcopy.TaskDirs[k] = v.Copy() + } + return dcopy +} + // NewTaskDir creates a new TaskDir and adds it to the AllocDirs TaskDirs map. func (d *AllocDir) NewTaskDir(name string) *TaskDir { td := newTaskDir(d.logger, d.AllocDir, name) diff --git a/client/allocdir/alloc_dir_test.go b/client/allocdir/alloc_dir_test.go index 7ed7c13b774..774d84ed3cf 100644 --- a/client/allocdir/alloc_dir_test.go +++ b/client/allocdir/alloc_dir_test.go @@ -16,6 +16,7 @@ import ( cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/testutil" "github.com/hashicorp/nomad/nomad/structs" + "github.com/kr/pretty" ) var ( @@ -50,7 +51,10 @@ var ( ) func testLogger() *log.Logger { - return log.New(os.Stderr, "", log.LstdFlags) + if testing.Verbose() { + return log.New(os.Stderr, "", log.LstdFlags) + } + return log.New(ioutil.Discard, "", log.LstdFlags) } // Test that AllocDir.Build builds just the alloc directory. @@ -409,6 +413,25 @@ func TestAllocDir_CreateDir(t *testing.T) { } } +// TestAllocDir_Copy asserts that AllocDir.Copy does a deep copy of itself and +// all TaskDirs. +func TestAllocDir_Copy(t *testing.T) { + a := NewAllocDir(testLogger(), "foo") + a.NewTaskDir("bar") + a.NewTaskDir("baz") + + b := a.Copy() + if diff := pretty.Diff(a, b); len(diff) > 0 { + t.Errorf("differences between copies: %# v", pretty.Formatter(diff)) + } + + // Make sure TaskDirs map is copied + a.NewTaskDir("new") + if b.TaskDirs["new"] != nil { + t.Errorf("TaskDirs map shared between copied") + } +} + func TestPathFuncs(t *testing.T) { dir, err := ioutil.TempDir("", "nomadtest-pathfuncs") if err != nil { diff --git a/client/allocdir/task_dir.go b/client/allocdir/task_dir.go index b4376057213..bcf6e6da8b9 100644 --- a/client/allocdir/task_dir.go +++ b/client/allocdir/task_dir.go @@ -57,6 +57,14 @@ func newTaskDir(logger *log.Logger, allocDir, taskName string) *TaskDir { } } +// Copy a TaskDir. Panics if TaskDir is nil as TaskDirs should never be nil. +func (t *TaskDir) Copy() *TaskDir { + // No nested structures other than the logger which is safe to share, + // so just copy the struct + tcopy := *t + return &tcopy +} + // Build default directories and permissions in a task directory. chrootCreated // allows skipping chroot creation if the caller knows it has already been // done.