diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 35a58e1cd05..6275bd1dd5a 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/boltdb/bolt" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" @@ -27,6 +28,13 @@ const ( taskReceivedSyncLimit = 30 * time.Second ) +var ( + // The following are the key paths written to the state database + allocRunnerStateImmutableKey = []byte("immutable") + allocRunnerStateMutableKey = []byte("mutable") + allocRunnerStateAllocDirKey = []byte("alloc-dir") +) + // AllocStateUpdater is used to update the status of an allocation type AllocStateUpdater func(alloc *structs.Allocation) @@ -69,10 +77,18 @@ type AllocRunner struct { destroyLock sync.Mutex waitCh chan struct{} - // serialize saveAllocRunnerState calls - persistLock sync.Mutex + // State related fields + // stateDB is used to store the alloc runners state + stateDB *bolt.DB + + // immutablePersisted and allocDirPersisted are used to track whether the + // immutable data and the alloc dir have been persisted. Once persisted we + // can lower write volume by not re-writing these values + immutablePersisted bool + allocDirPersisted bool } +// COMPAT: Remove in 0.7.0 // allocRunnerState is used to snapshot the state of the alloc runner type allocRunnerState struct { Version string @@ -95,13 +111,29 @@ type allocRunnerState struct { } `json:"Context,omitempty"` } +// allocRunnerImmutableState is state that only has to be written once as it +// doesn't change over the life-cycle of the alloc_runner. +type allocRunnerImmutableState struct { + Version string + Alloc *structs.Allocation +} + +// allocRunnerMutableState is state that has to be written on each save as it +// changes over the life-cycle of the alloc_runner. +type allocRunnerMutableState struct { + AllocClientStatus string + AllocClientDescription string + TaskStates map[string]*structs.TaskState +} + // NewAllocRunner is used to create a new allocation context -func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater, +func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater, alloc *structs.Allocation, vaultClient vaultclient.VaultClient, consulClient ConsulServiceAPI) *AllocRunner { ar := &AllocRunner{ config: config, + stateDB: stateDB, updater: updater, logger: logger, alloc: alloc, @@ -118,8 +150,10 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStat return ar } -// stateFilePath returns the path to our state file -func (r *AllocRunner) stateFilePath() string { +// pre060StateFilePath returns the path to our state file that would have been +// written pre v0.6.0 +// COMPAT: Remove in 0.7.0 +func (r *AllocRunner) pre060StateFilePath() string { r.allocLock.Lock() defer r.allocLock.Unlock() path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json") @@ -128,28 +162,79 @@ func (r *AllocRunner) stateFilePath() string { // RestoreState is used to restore the state of the alloc runner func (r *AllocRunner) RestoreState() error { - // Load the snapshot + + // COMPAT: Remove in 0.7.0 + // Check if the old snapshot is there + oldPath := r.pre060StateFilePath() var snap allocRunnerState - if err := restoreState(r.stateFilePath(), &snap); err != nil { + var upgrading bool + if err := pre060RestoreState(oldPath, &snap); err == nil { + // Restore fields + r.logger.Printf("[INFO] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.alloc.ID) + r.alloc = snap.Alloc + r.allocDir = snap.AllocDir + r.allocClientStatus = snap.AllocClientStatus + r.allocClientDescription = snap.AllocClientDescription + + if r.alloc != nil { + r.taskStates = snap.Alloc.TaskStates + } + + // COMPAT: Remove in 0.7.0 + // #2132 Upgrade path: if snap.AllocDir is nil, try to convert old + // Context struct to new AllocDir struct + if snap.AllocDir == nil && snap.Context != nil { + r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID) + r.allocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir) + for taskName := range snap.Context.AllocDir.TaskDirs { + r.allocDir.NewTaskDir(taskName) + } + } + + // Delete the old state + os.RemoveAll(oldPath) + upgrading = true + } else if !os.IsNotExist(err) { + // Something corrupt in the old state file return err - } + } else { + // We are doing a normal restore + err := r.stateDB.View(func(tx *bolt.Tx) error { + bkt, err := getAllocationBucket(tx, r.alloc.ID) + if err != nil { + return fmt.Errorf("failed to get allocation bucket: %v", err) + } - // #2132 Upgrade path: if snap.AllocDir is nil, try to convert old - // Context struct to new AllocDir struct - if snap.AllocDir == nil && snap.Context != nil { - r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID) - snap.AllocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir) - for taskName := range snap.Context.AllocDir.TaskDirs { - snap.AllocDir.NewTaskDir(taskName) + // Get the state objects + var mutable allocRunnerMutableState + var immutable allocRunnerImmutableState + var allocDir allocdir.AllocDir + + if err := getObject(bkt, allocRunnerStateImmutableKey, &immutable); err != nil { + return fmt.Errorf("failed to read alloc runner immutable state: %v", err) + } + if err := getObject(bkt, allocRunnerStateMutableKey, &mutable); err != nil { + return fmt.Errorf("failed to read alloc runner mutable state: %v", err) + } + if err := getObject(bkt, allocRunnerStateAllocDirKey, &allocDir); err != nil { + return fmt.Errorf("failed to read alloc runner alloc_dir state: %v", err) + } + + // Populate the fields + r.alloc = immutable.Alloc + r.allocDir = &allocDir + r.allocClientStatus = mutable.AllocClientStatus + r.allocClientDescription = mutable.AllocClientDescription + r.taskStates = mutable.TaskStates + r.alloc.ClientStatus = getClientStatus(r.taskStates) + return nil + }) + + if err != nil { + return fmt.Errorf("failed to read allocation state: %v", err) } } - // Restore fields - r.alloc = snap.Alloc - r.allocDir = snap.AllocDir - r.allocClientStatus = snap.AllocClientStatus - r.allocClientDescription = snap.AllocClientDescription - var snapshotErrors multierror.Error if r.alloc == nil { snapshotErrors.Errors = append(snapshotErrors.Errors, fmt.Errorf("alloc_runner snapshot includes a nil allocation")) @@ -161,11 +246,17 @@ func (r *AllocRunner) RestoreState() error { return e } - r.taskStates = snap.Alloc.TaskStates + tg := r.alloc.Job.LookupTaskGroup(r.alloc.TaskGroup) + if tg == nil { + return fmt.Errorf("restored allocation doesn't contain task group %q", r.alloc.TaskGroup) + } // Restore the task runners var mErr multierror.Error - for name, state := range r.taskStates { + for _, task := range tg.Tasks { + name := task.Name + state := r.taskStates[name] + // Mark the task as restored. r.restored[name] = struct{}{} @@ -177,8 +268,7 @@ func (r *AllocRunner) RestoreState() error { return err } - task := &structs.Task{Name: name} - tr := NewTaskRunner(r.logger, r.config, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient) + tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient) r.tasks[name] = tr // Skip tasks in terminal states. @@ -193,22 +283,23 @@ func (r *AllocRunner) RestoreState() error { // Only start if the alloc isn't in a terminal status. go tr.Run() + if upgrading { + if err := tr.SaveState(); err != nil { + r.logger.Printf("[WARN] client: initial save state for alloc %s task %s failed: %v", r.alloc.ID, name, err) + } + } + // Restart task runner if RestoreState gave a reason if restartReason != "" { + r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.alloc.ID, name, restartReason) tr.Restart("upgrade", restartReason) } } - } return mErr.ErrorOrNil() } -// GetAllocDir returns the alloc dir for the alloc runner -func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir { - return r.allocDir -} - // SaveState is used to snapshot the state of the alloc runner // if the fullSync is marked as false only the state of the Alloc Runner // is snapshotted. If fullSync is marked as true, we snapshot @@ -230,10 +321,7 @@ func (r *AllocRunner) SaveState() error { } func (r *AllocRunner) saveAllocRunnerState() error { - r.persistLock.Lock() - defer r.persistLock.Unlock() - - // Create the snapshot. + // Grab all the relevant data alloc := r.Alloc() r.allocLock.Lock() @@ -245,14 +333,55 @@ func (r *AllocRunner) saveAllocRunnerState() error { allocDir := r.allocDir r.allocDirLock.Unlock() - snap := allocRunnerState{ - Version: r.config.Version, - Alloc: alloc, - AllocDir: allocDir, - AllocClientStatus: allocClientStatus, - AllocClientDescription: allocClientDescription, - } - return persistState(r.stateFilePath(), &snap) + // Start the transaction. + return r.stateDB.Batch(func(tx *bolt.Tx) error { + + // Grab the allocation bucket + allocBkt, err := getAllocationBucket(tx, r.alloc.ID) + if err != nil { + return fmt.Errorf("failed to retrieve allocation bucket: %v", err) + } + + // Write the immutable data + if !r.immutablePersisted { + immutable := &allocRunnerImmutableState{ + Alloc: alloc, + Version: r.config.Version, + } + + if err := putObject(allocBkt, allocRunnerStateImmutableKey, &immutable); err != nil { + return fmt.Errorf("failed to write alloc_runner immutable state: %v", err) + } + + tx.OnCommit(func() { + r.immutablePersisted = true + }) + } + + // Write the alloc dir data if it hasn't been written before and it exists. + if !r.allocDirPersisted && r.allocDir != nil { + if err := putObject(allocBkt, allocRunnerStateAllocDirKey, allocDir); err != nil { + return fmt.Errorf("failed to write alloc_runner allocDir state: %v", err) + } + + tx.OnCommit(func() { + r.allocDirPersisted = true + }) + } + + // Write the mutable state every time + mutable := &allocRunnerMutableState{ + AllocClientStatus: allocClientStatus, + AllocClientDescription: allocClientDescription, + TaskStates: alloc.TaskStates, + } + + if err := putObject(allocBkt, allocRunnerStateMutableKey, &mutable); err != nil { + return fmt.Errorf("failed to write alloc_runner mutable state: %v", err) + } + + return nil + }) } func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error { @@ -265,7 +394,12 @@ func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error { // DestroyState is used to cleanup after ourselves func (r *AllocRunner) DestroyState() error { - return os.RemoveAll(filepath.Dir(r.stateFilePath())) + return r.stateDB.Update(func(tx *bolt.Tx) error { + if err := deleteAllocationBucket(tx, r.alloc.ID); err != nil { + return fmt.Errorf("failed to delete allocation bucket: %v", err) + } + return nil + }) } // DestroyContext is used to destroy the context @@ -273,6 +407,11 @@ func (r *AllocRunner) DestroyContext() error { return r.allocDir.Destroy() } +// GetAllocDir returns the alloc dir for the alloc runner +func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir { + return r.allocDir +} + // copyTaskStates returns a copy of the passed task states. func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.TaskState { copy := make(map[string]*structs.TaskState, len(states)) @@ -285,8 +424,20 @@ func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.Ta // Alloc returns the associated allocation func (r *AllocRunner) Alloc() *structs.Allocation { r.allocLock.Lock() + + // Clear the job before copying + job := r.alloc.Job + + // Since we are clearing the job, anything that access the alloc.Job field + // must acquire the lock or access it via this method. + r.alloc.Job = nil + alloc := r.alloc.Copy() + // Restore + r.alloc.Job = job + alloc.Job = job + // The status has explicitly been set. if r.allocClientStatus != "" || r.allocClientDescription != "" { alloc.ClientStatus = r.allocClientStatus @@ -303,10 +454,19 @@ func (r *AllocRunner) Alloc() *structs.Allocation { r.allocLock.Unlock() // Scan the task states to determine the status of the alloc - var pending, running, dead, failed bool r.taskStatusLock.RLock() alloc.TaskStates = copyTaskStates(r.taskStates) - for _, state := range r.taskStates { + alloc.ClientStatus = getClientStatus(r.taskStates) + r.taskStatusLock.RUnlock() + + return alloc +} + +// getClientStatus takes in the task states for a given allocation and computes +// the client status +func getClientStatus(taskStates map[string]*structs.TaskState) string { + var pending, running, dead, failed bool + for _, state := range taskStates { switch state.State { case structs.TaskStateRunning: running = true @@ -320,20 +480,19 @@ func (r *AllocRunner) Alloc() *structs.Allocation { } } } - r.taskStatusLock.RUnlock() // Determine the alloc status if failed { - alloc.ClientStatus = structs.AllocClientStatusFailed + return structs.AllocClientStatusFailed } else if running { - alloc.ClientStatus = structs.AllocClientStatusRunning + return structs.AllocClientStatusRunning } else if pending { - alloc.ClientStatus = structs.AllocClientStatusPending + return structs.AllocClientStatusPending } else if dead { - alloc.ClientStatus = structs.AllocClientStatusComplete + return structs.AllocClientStatusComplete } - return alloc + return "" } // dirtySyncState is used to watch for state being marked dirty to sync @@ -469,7 +628,7 @@ func (r *AllocRunner) Run() { go r.dirtySyncState() // Find the task group to run in the allocation - alloc := r.alloc + alloc := r.Alloc() tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) if tg == nil { r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup) @@ -522,7 +681,7 @@ func (r *AllocRunner) Run() { taskdir := r.allocDir.NewTaskDir(task.Name) r.allocDirLock.Unlock() - tr := NewTaskRunner(r.logger, r.config, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient) + tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient) r.tasks[task.Name] = tr tr.MarkReceived() @@ -555,6 +714,10 @@ OUTER: for _, tr := range runners { tr.Update(update) } + + if err := r.syncStatus(); err != nil { + r.logger.Printf("[WARN] client: failed to sync status upon receiving alloc update: %v", err) + } case <-r.destroyCh: taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled) break OUTER diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 0f845daf93d..ee3b2e0c0d4 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -10,6 +10,7 @@ import ( "text/template" "time" + "github.com/boltdb/bolt" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -36,13 +37,15 @@ func testAllocRunnerFromAlloc(alloc *structs.Allocation, restarts bool) (*MockAl conf.Node = mock.Node() conf.StateDir = os.TempDir() conf.AllocDir = os.TempDir() + tmp, _ := ioutil.TempFile("", "state-db") + db, _ := bolt.Open(tmp.Name(), 0600, nil) upd := &MockAllocStateUpdater{} if !restarts { *alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0} alloc.Job.Type = structs.JobTypeBatch } vclient := vaultclient.NewMockVaultClient() - ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, newMockConsulServiceClient()) + ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, newMockConsulServiceClient()) return upd, ar } @@ -171,9 +174,15 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete) } - // Check the state still exists - if _, err := os.Stat(ar.stateFilePath()); err != nil { - return false, fmt.Errorf("state file destroyed: %v", err) + // Check the allocation state still exists + if err := ar.stateDB.View(func(tx *bolt.Tx) error { + if !allocationBucketExists(tx, ar.Alloc().ID) { + return fmt.Errorf("no bucket for alloc") + } + + return nil + }); err != nil { + return false, fmt.Errorf("state destroyed") } // Check the alloc directory still exists @@ -201,10 +210,14 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { } // Check the state was cleaned - if _, err := os.Stat(ar.stateFilePath()); err == nil { - return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath()) - } else if !os.IsNotExist(err) { - return false, fmt.Errorf("stat err: %v", err) + if err := ar.stateDB.View(func(tx *bolt.Tx) error { + if allocationBucketExists(tx, ar.Alloc().ID) { + return fmt.Errorf("bucket for alloc exists") + } + + return nil + }); err != nil { + return false, fmt.Errorf("state not destroyed") } // Check the alloc directory was cleaned @@ -249,10 +262,14 @@ func TestAllocRunner_Destroy(t *testing.T) { } // Check the state was cleaned - if _, err := os.Stat(ar.stateFilePath()); err == nil { - return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath()) - } else if !os.IsNotExist(err) { - return false, fmt.Errorf("stat err: %v", err) + if err := ar.stateDB.View(func(tx *bolt.Tx) error { + if allocationBucketExists(tx, ar.Alloc().ID) { + return fmt.Errorf("bucket for alloc exists") + } + + return nil + }); err != nil { + return false, fmt.Errorf("state not destroyed") } // Check the alloc directory was cleaned @@ -324,7 +341,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { // Create a new alloc runner l2 := prefixedTestLogger("----- ar2: ") - ar2 := NewAllocRunner(l2, ar.config, upd.Update, + ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient) err = ar2.RestoreState() @@ -368,12 +385,10 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { } func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { - ctestutil.ExecCompatible(t) upd, ar := testAllocRunner(false) ar.logger = prefixedTestLogger("ar1: ") // Ensure task takes some time - ar.alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" task := ar.alloc.Job.TaskGroups[0].Tasks[0] task.Config["run_for"] = "10s" @@ -410,14 +425,14 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { } // Ensure ar1 doesn't recreate the state file - ar.persistLock.Lock() - defer ar.persistLock.Unlock() + ar.allocLock.Lock() + defer ar.allocLock.Unlock() // Ensure both alloc runners don't destroy ar.destroy = true // Create a new alloc runner - ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update, + ar2 := NewAllocRunner(ar.logger, ar.config, ar.stateDB, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient) ar2.logger = prefixedTestLogger("ar2: ") err = ar2.RestoreState() @@ -429,8 +444,14 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { testutil.WaitForResult(func() (bool, error) { // Check the state still exists - if _, err := os.Stat(ar.stateFilePath()); err != nil { - return false, fmt.Errorf("state file destroyed: %v", err) + if err := ar.stateDB.View(func(tx *bolt.Tx) error { + if !allocationBucketExists(tx, ar2.Alloc().ID) { + return fmt.Errorf("no bucket for alloc") + } + + return nil + }); err != nil { + return false, fmt.Errorf("state destroyed") } // Check the alloc directory still exists @@ -459,10 +480,14 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { } // Check the state was cleaned - if _, err := os.Stat(ar.stateFilePath()); err == nil { - return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath()) - } else if !os.IsNotExist(err) { - return false, fmt.Errorf("stat err: %v", err) + if err := ar.stateDB.View(func(tx *bolt.Tx) error { + if allocationBucketExists(tx, ar2.Alloc().ID) { + return fmt.Errorf("bucket for alloc exists") + } + + return nil + }); err != nil { + return false, fmt.Errorf("state not destroyed") } // Check the alloc directory was cleaned @@ -497,7 +522,14 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) { // Snapshot state testutil.WaitForResult(func() (bool, error) { - return len(ar.tasks) == 1, nil + if upd.Count == 0 { + return false, fmt.Errorf("No updates") + } + last := upd.Allocs[upd.Count-1] + if last.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) + } + return true, nil }, func(err error) { t.Fatalf("task never started: %v", err) }) @@ -509,9 +541,7 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) { // Create a new alloc runner l2 := prefixedTestLogger("----- ar2: ") - ar2 := NewAllocRunner(l2, origConfig, upd.Update, - &structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, - ar.consulClient) + ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient) err = ar2.RestoreState() if err != nil { t.Fatalf("err: %v", err) @@ -527,14 +557,14 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) { return false, nil } - for _, ev := range ar2.alloc.TaskStates["web"].Events { + for _, ev := range ar2.taskStates["web"].Events { if strings.HasSuffix(ev.RestartReason, pre06ScriptCheckReason) { return true, nil } } return false, fmt.Errorf("no restart with proper reason found") }, func(err error) { - t.Fatalf("err: %v\nAllocs: %#v\nWeb State: %#v", err, upd.Allocs, ar2.alloc.TaskStates["web"]) + t.Fatalf("err: %v\nAllocs: %#v\nWeb State: %#v", err, upd.Allocs, ar2.taskStates["web"]) }) // Destroy and wait @@ -584,6 +614,14 @@ func TestAllocRunner_RestoreOldState(t *testing.T) { conf := config.DefaultConfig() conf.StateDir = os.TempDir() conf.AllocDir = os.TempDir() + tmp, err := ioutil.TempFile("", "state-db") + if err != nil { + t.Fatalf("error creating state db file: %v", err) + } + db, err := bolt.Open(tmp.Name(), 0600, nil) + if err != nil { + t.Fatalf("error creating state db: %v", err) + } if err := os.MkdirAll(filepath.Join(conf.StateDir, "alloc", alloc.ID), 0777); err != nil { t.Fatalf("error creating state dir: %v", err) @@ -655,7 +693,7 @@ func TestAllocRunner_RestoreOldState(t *testing.T) { alloc.Job.Type = structs.JobTypeBatch vclient := vaultclient.NewMockVaultClient() cclient := newMockConsulServiceClient() - ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, cclient) + ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, cclient) defer ar.Destroy() // RestoreState should fail on the task state since we only test the @@ -671,7 +709,7 @@ func TestAllocRunner_RestoreOldState(t *testing.T) { if len(merr.Errors) != 1 { t.Fatalf("expected exactly 1 error from RestoreState but found: %d: %v", len(merr.Errors), err) } - if expected := "task runner snapshot includes nil Task"; merr.Errors[0].Error() != expected { + if expected := "failed to get task bucket"; !strings.Contains(merr.Errors[0].Error(), expected) { t.Fatalf("expected %q but got: %q", expected, merr.Errors[0].Error()) } diff --git a/client/client.go b/client/client.go index 89e14823b85..59460bf1199 100644 --- a/client/client.go +++ b/client/client.go @@ -16,6 +16,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/boltdb/bolt" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-multierror" @@ -99,6 +100,9 @@ type Client struct { config *config.Config start time.Time + // stateDB is used to efficiently store client state. + stateDB *bolt.DB + // configCopy is a copy that should be passed to alloc-runners. configCopy *config.Config configLock sync.RWMutex @@ -340,6 +344,13 @@ func (c *Client) init() error { } c.logger.Printf("[INFO] client: using state directory %v", c.config.StateDir) + // Create or open the state database + db, err := bolt.Open(filepath.Join(c.config.StateDir, "state.db"), 0600, nil) + if err != nil { + return fmt.Errorf("failed to create state database", err) + } + c.stateDB = db + // Ensure the alloc dir exists if we have one if c.config.AllocDir != "" { if err := os.MkdirAll(c.config.AllocDir, 0755); err != nil { @@ -410,6 +421,13 @@ func (c *Client) Shutdown() error { return nil } + // Defer closing the database + defer func() { + if err := c.stateDB.Close(); err != nil { + c.logger.Printf("[ERR] client: failed to close state database on shutdown: %v", err) + } + }() + // Stop renewing tokens and secrets if c.vaultClient != nil { c.vaultClient.Stop() @@ -590,49 +608,106 @@ func (c *Client) restoreState() error { return nil } + // COMPAT: Remove in 0.7.0 + // 0.6.0 transistioned from individual state files to a single bolt-db. + // The upgrade path is to: + // Check if old state exists + // If so, restore from that and delete old state + // Restore using state database + + // Allocs holds the IDs of the allocations being restored + var allocs []string + + // Upgrading tracks whether this is a pre 0.6.0 upgrade path + var upgrading bool + // Scan the directory - list, err := ioutil.ReadDir(filepath.Join(c.config.StateDir, "alloc")) - if err != nil && os.IsNotExist(err) { - return nil - } else if err != nil { + allocDir := filepath.Join(c.config.StateDir, "alloc") + list, err := ioutil.ReadDir(allocDir) + if err != nil && !os.IsNotExist(err) { return fmt.Errorf("failed to list alloc state: %v", err) + } else if err == nil && len(list) != 0 { + upgrading = true + for _, entry := range list { + allocs = append(allocs, entry.Name()) + } + } else { + // Normal path + err := c.stateDB.View(func(tx *bolt.Tx) error { + allocs, err = getAllAllocationIDs(tx) + if err != nil { + return fmt.Errorf("failed to list allocations: %v", err) + } + return nil + }) + if err != nil { + return err + } } // Load each alloc back var mErr multierror.Error - for _, entry := range list { - id := entry.Name() + for _, id := range allocs { alloc := &structs.Allocation{ID: id} + c.configLock.RLock() - ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient, c.consulService) + ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService) c.configLock.RUnlock() + c.allocLock.Lock() c.allocs[id] = ar c.allocLock.Unlock() + if err := ar.RestoreState(); err != nil { c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v", id, err) mErr.Errors = append(mErr.Errors, err) } else { go ar.Run() + + if upgrading { + if err := ar.SaveState(); err != nil { + c.logger.Printf("[WARN] client: initial save state for alloc %s failed: %v", id, err) + } + } } } + + // Delete all the entries + if upgrading { + if err := os.RemoveAll(allocDir); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + return mErr.ErrorOrNil() } -// saveState is used to snapshot our state into the data dir +// saveState is used to snapshot our state into the data dir. func (c *Client) saveState() error { if c.config.DevMode { return nil } + var wg sync.WaitGroup + var l sync.Mutex var mErr multierror.Error - for id, ar := range c.getAllocRunners() { - if err := ar.SaveState(); err != nil { - c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", - id, err) - mErr.Errors = append(mErr.Errors, err) - } + runners := c.getAllocRunners() + wg.Add(len(runners)) + + for id, ar := range runners { + go func(id string, ar *AllocRunner) { + err := ar.SaveState() + if err != nil { + c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", id, err) + l.Lock() + multierror.Append(&mErr, err) + l.Unlock() + } + wg.Done() + }(id, ar) } + + wg.Wait() return mErr.ErrorOrNil() } @@ -1466,8 +1541,7 @@ func (c *Client) runAllocs(update *allocUpdates) { // Remove the old allocations for _, remove := range diff.removed { if err := c.removeAlloc(remove); err != nil { - c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v", - remove.ID, err) + c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v", remove.ID, err) } } @@ -1542,11 +1616,6 @@ func (c *Client) runAllocs(update *allocUpdates) { add.ID, err) } } - - // Persist our state - if err := c.saveState(); err != nil { - c.logger.Printf("[ERR] client: failed to save state: %v", err) - } } // blockForRemoteAlloc blocks until the previous allocation of an allocation has @@ -1892,9 +1961,14 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo defer c.allocLock.Unlock() c.configLock.RLock() - ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient, c.consulService) + ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService) ar.SetPreviousAllocDir(prevAllocDir) c.configLock.RUnlock() + + if err := ar.SaveState(); err != nil { + c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", alloc.ID, err) + } + go ar.Run() // Store the alloc runner. diff --git a/client/driver/driver.go b/client/driver/driver.go index 9fe2135896d..ee3298c6c41 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -2,8 +2,10 @@ package driver import ( "context" + "crypto/md5" "errors" "fmt" + "io" "log" "os" "strings" @@ -150,6 +152,20 @@ func (r *CreatedResources) Merge(o *CreatedResources) { } } +func (r *CreatedResources) Hash() []byte { + h := md5.New() + + for k, values := range r.Resources { + io.WriteString(h, k) + io.WriteString(h, "values") + for i, v := range values { + io.WriteString(h, fmt.Sprintf("%d-%v", i, v)) + } + } + + return h.Sum(nil) +} + // Driver is used for execution of tasks. This allows Nomad // to support many pluggable implementations of task drivers. // Examples could include LXC, Docker, Qemu, etc. diff --git a/client/state_database.go b/client/state_database.go new file mode 100644 index 00000000000..4a5784a6a7e --- /dev/null +++ b/client/state_database.go @@ -0,0 +1,214 @@ +package client + +import ( + "bytes" + "fmt" + + "github.com/boltdb/bolt" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/ugorji/go/codec" +) + +/* +The client has a boltDB backed state store. The schema as of 0.6 looks as follows: + +allocations/ (bucket) +|--> / (bucket) + |--> alloc_runner persisted objects (k/v) + |--> / (bucket) + |--> task_runner persisted objects (k/v) +*/ + +var ( + // allocationsBucket is the bucket name containing all allocation related + // data + allocationsBucket = []byte("allocations") +) + +func putObject(bkt *bolt.Bucket, key []byte, obj interface{}) error { + if !bkt.Writable() { + return fmt.Errorf("bucket must be writable") + } + + // Serialize the object + var buf bytes.Buffer + if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(obj); err != nil { + return fmt.Errorf("failed to encode passed object: %v", err) + } + + if err := bkt.Put(key, buf.Bytes()); err != nil { + return fmt.Errorf("failed to write data at key %v: %v", string(key), err) + } + + return nil +} + +func putData(bkt *bolt.Bucket, key, value []byte) error { + if !bkt.Writable() { + return fmt.Errorf("bucket must be writable") + } + + if err := bkt.Put(key, value); err != nil { + return fmt.Errorf("failed to write data at key %v: %v", string(key), err) + } + + return nil +} + +func getObject(bkt *bolt.Bucket, key []byte, obj interface{}) error { + // Get the data + data := bkt.Get(key) + if data == nil { + return fmt.Errorf("no data at key %v", string(key)) + } + + // Deserialize the object + if err := codec.NewDecoderBytes(data, structs.MsgpackHandle).Decode(obj); err != nil { + return fmt.Errorf("failed to decode data into passed object: %v", err) + } + + return nil +} + +// getAllocationBucket returns the bucket used to persist state about a +// particular allocation. If the root allocation bucket or the specific +// allocation bucket doesn't exist, it will be created as long as the +// transaction is writable. +func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) { + var err error + w := tx.Writable() + + // Retrieve the root allocations bucket + allocations := tx.Bucket(allocationsBucket) + if allocations == nil { + if !w { + return nil, fmt.Errorf("Allocations bucket doesn't exist and transaction is not writable") + } + + allocations, err = tx.CreateBucket(allocationsBucket) + if err != nil { + return nil, err + } + } + + // Retrieve the specific allocations bucket + key := []byte(allocID) + alloc := allocations.Bucket(key) + if alloc == nil { + if !w { + return nil, fmt.Errorf("Allocation bucket doesn't exist and transaction is not writable") + } + + alloc, err = allocations.CreateBucket(key) + if err != nil { + return nil, err + } + } + + return alloc, nil +} + +func allocationBucketExists(tx *bolt.Tx, allocID string) bool { + allocations := tx.Bucket(allocationsBucket) + if allocations == nil { + return false + } + + // Retrieve the specific allocations bucket + alloc := allocations.Bucket([]byte(allocID)) + return alloc != nil +} + +// getTaskBucket returns the bucket used to persist state about a +// particular task. If the root allocation bucket, the specific +// allocation or task bucket doesn't exist, they will be created as long as the +// transaction is writable. +func getTaskBucket(tx *bolt.Tx, allocID, taskName string) (*bolt.Bucket, error) { + alloc, err := getAllocationBucket(tx, allocID) + if err != nil { + return nil, err + } + + // Retrieve the specific task bucket + w := tx.Writable() + key := []byte(taskName) + task := alloc.Bucket(key) + if task == nil { + if !w { + return nil, fmt.Errorf("Task bucket doesn't exist and transaction is not writable") + } + + task, err = alloc.CreateBucket(key) + if err != nil { + return nil, err + } + } + + return task, nil +} + +// deleteAllocationBucket is used to delete an allocation bucket if it exists. +func deleteAllocationBucket(tx *bolt.Tx, allocID string) error { + if !tx.Writable() { + return fmt.Errorf("transaction must be writable") + } + + // Retrieve the root allocations bucket + allocations := tx.Bucket(allocationsBucket) + if allocations == nil { + return nil + } + + // Check if the bucket exists + key := []byte(allocID) + if allocBkt := allocations.Bucket(key); allocBkt == nil { + return nil + } + + return allocations.DeleteBucket(key) +} + +// deleteTaskBucket is used to delete a task bucket if it exists. +func deleteTaskBucket(tx *bolt.Tx, allocID, taskName string) error { + if !tx.Writable() { + return fmt.Errorf("transaction must be writable") + } + + // Retrieve the root allocations bucket + allocations := tx.Bucket(allocationsBucket) + if allocations == nil { + return nil + } + + // Retrieve the specific allocations bucket + alloc := allocations.Bucket([]byte(allocID)) + if alloc == nil { + return nil + } + + // Check if the bucket exists + key := []byte(taskName) + if taskBkt := alloc.Bucket(key); taskBkt == nil { + return nil + } + + return alloc.DeleteBucket(key) +} + +func getAllAllocationIDs(tx *bolt.Tx) ([]string, error) { + allocationsBkt := tx.Bucket(allocationsBucket) + if allocationsBkt == nil { + return nil, nil + } + + // Create a cursor for iteration. + var allocIDs []string + c := allocationsBkt.Cursor() + + // Iterate over all the buckets + for k, _ := c.First(); k != nil; k, _ = c.Next() { + allocIDs = append(allocIDs, string(k)) + } + + return allocIDs, nil +} diff --git a/client/task_runner.go b/client/task_runner.go index 078e70dcbe6..296f1b5f274 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -1,9 +1,11 @@ package client import ( + "bytes" "crypto/md5" "encoding/hex" "fmt" + "io" "io/ioutil" "log" "os" @@ -13,6 +15,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/boltdb/bolt" "github.com/golang/snappy" "github.com/hashicorp/consul-template/signals" "github.com/hashicorp/go-multierror" @@ -23,6 +26,7 @@ import ( "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" + "github.com/ugorji/go/codec" "github.com/hashicorp/nomad/client/driver/env" dstructs "github.com/hashicorp/nomad/client/driver/structs" @@ -55,8 +59,15 @@ const ( vaultTokenFile = "vault_token" ) +var ( + // taskRunnerStateAllKey holds all the task runners state. At the moment + // there is no need to split it + taskRunnerStateAllKey = []byte("simple-all") +) + // TaskRunner is used to wrap a task within an allocation and provide the execution context. type TaskRunner struct { + stateDB *bolt.DB config *config.Config updater TaskStateUpdater logger *log.Logger @@ -143,17 +154,33 @@ type TaskRunner struct { // AllocRunner, so all state fields must be synchronized using this // lock. persistLock sync.Mutex + + // persistedHash is the hash of the last persisted snapshot. It is used to + // detect if a new snapshot has to be writen to disk. + persistedHash []byte } // taskRunnerState is used to snapshot the state of the task runner type taskRunnerState struct { Version string - Task *structs.Task HandleID string ArtifactDownloaded bool TaskDirBuilt bool - CreatedResources *driver.CreatedResources PayloadRendered bool + CreatedResources *driver.CreatedResources +} + +func (s *taskRunnerState) Hash() []byte { + h := md5.New() + + io.WriteString(h, s.Version) + io.WriteString(h, s.HandleID) + io.WriteString(h, fmt.Sprintf("%v", s.ArtifactDownloaded)) + io.WriteString(h, fmt.Sprintf("%v", s.TaskDirBuilt)) + io.WriteString(h, fmt.Sprintf("%v", s.PayloadRendered)) + h.Write(s.CreatedResources.Hash()) + + return h.Sum(nil) } // TaskStateUpdater is used to signal that tasks state has changed. @@ -173,7 +200,7 @@ type SignalEvent struct { // NewTaskRunner is used to create a new task context func NewTaskRunner(logger *log.Logger, config *config.Config, - updater TaskStateUpdater, taskDir *allocdir.TaskDir, + stateDB *bolt.DB, updater TaskStateUpdater, taskDir *allocdir.TaskDir, alloc *structs.Allocation, task *structs.Task, vaultClient vaultclient.VaultClient, consulClient ConsulServiceAPI) *TaskRunner { @@ -190,6 +217,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, tc := &TaskRunner{ config: config, + stateDB: stateDB, updater: updater, logger: logger, restartTracker: restartTracker, @@ -222,17 +250,17 @@ func (r *TaskRunner) WaitCh() <-chan struct{} { return r.waitCh } -// stateFilePath returns the path to our state file -func (r *TaskRunner) stateFilePath() string { +// pre060StateFilePath returns the path to our state file that would have been +// written pre v0.6.0 +// COMPAT: Remove in 0.7.0 +func (r *TaskRunner) pre060StateFilePath() string { // Get the MD5 of the task name hashVal := md5.Sum([]byte(r.task.Name)) hashHex := hex.EncodeToString(hashVal[:]) dirName := fmt.Sprintf("task-%s", hashHex) // Generate the path - path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, - dirName, "state.json") - return path + return filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, dirName, "state.json") } // RestoreState is used to restore our state. If a non-empty string is returned @@ -240,22 +268,46 @@ func (r *TaskRunner) stateFilePath() string { // backwards incompatible upgrades that need to restart tasks with a new // executor. func (r *TaskRunner) RestoreState() (string, error) { - // Load the snapshot + // COMPAT: Remove in 0.7.0 + // 0.6.0 transistioned from individual state files to a single bolt-db. + // The upgrade path is to: + // Check if old state exists + // If so, restore from that and delete old state + // Restore using state database + var snap taskRunnerState - if err := restoreState(r.stateFilePath(), &snap); err != nil { - return "", err - } - // Restore fields - if snap.Task == nil { - return "", fmt.Errorf("task runner snapshot includes nil Task") + // Check if the old snapshot is there + oldPath := r.pre060StateFilePath() + if err := pre060RestoreState(oldPath, &snap); err == nil { + // Delete the old state + os.RemoveAll(oldPath) + } else if !os.IsNotExist(err) { + // Something corrupt in the old state file + return "", err } else { - r.task = snap.Task + // We are doing a normal restore + err := r.stateDB.View(func(tx *bolt.Tx) error { + bkt, err := getTaskBucket(tx, r.alloc.ID, r.task.Name) + if err != nil { + return fmt.Errorf("failed to get task bucket: %v", err) + } + + if err := getObject(bkt, taskRunnerStateAllKey, &snap); err != nil { + return fmt.Errorf("failed to read task runner state: %v", err) + } + return nil + }) + if err != nil { + return "", err + } + } + + // Restore fields from the snapshot r.artifactsDownloaded = snap.ArtifactDownloaded r.taskDirBuilt = snap.TaskDirBuilt r.payloadRendered = snap.PayloadRendered - r.setCreatedResources(snap.CreatedResources) if err := r.setTaskEnv(); err != nil { @@ -357,9 +409,7 @@ func pre06ScriptCheck(ver, driver string, services []*structs.Service) bool { func (r *TaskRunner) SaveState() error { r.persistLock.Lock() defer r.persistLock.Unlock() - snap := taskRunnerState{ - Task: r.task, Version: r.config.Version, ArtifactDownloaded: r.artifactsDownloaded, TaskDirBuilt: r.taskDirBuilt, @@ -372,7 +422,38 @@ func (r *TaskRunner) SaveState() error { snap.HandleID = r.handle.ID() } r.handleLock.Unlock() - return persistState(r.stateFilePath(), &snap) + + // If nothing has changed avoid the write + h := snap.Hash() + if bytes.Equal(h, r.persistedHash) { + return nil + } + + // Serialize the object + var buf bytes.Buffer + if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(&snap); err != nil { + return fmt.Errorf("failed to serialize snapshot: %v", err) + } + + // Start the transaction. + return r.stateDB.Batch(func(tx *bolt.Tx) error { + // Grab the task bucket + taskBkt, err := getTaskBucket(tx, r.alloc.ID, r.task.Name) + if err != nil { + return fmt.Errorf("failed to retrieve allocation bucket: %v", err) + } + + if err := putData(taskBkt, taskRunnerStateAllKey, buf.Bytes()); err != nil { + return fmt.Errorf("failed to write task_runner state: %v", err) + } + + // Store the hash that was persisted + tx.OnCommit(func() { + r.persistedHash = h + }) + + return nil + }) } // DestroyState is used to cleanup after ourselves @@ -380,7 +461,12 @@ func (r *TaskRunner) DestroyState() error { r.persistLock.Lock() defer r.persistLock.Unlock() - return os.RemoveAll(r.stateFilePath()) + return r.stateDB.Update(func(tx *bolt.Tx) error { + if err := deleteTaskBucket(tx, r.alloc.ID, r.task.Name); err != nil { + return fmt.Errorf("failed to delete task bucket: %v", err) + } + return nil + }) } // setState is used to update the state of the task runner diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 3e75062e1ad..cfac5ea4256 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/boltdb/bolt" "github.com/golang/snappy" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" @@ -77,6 +78,16 @@ func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocat conf.Node = mock.Node() conf.StateDir = os.TempDir() conf.AllocDir = os.TempDir() + + tmp, err := ioutil.TempFile("", "state-db") + if err != nil { + t.Fatalf("error creating state db file: %v", err) + } + db, err := bolt.Open(tmp.Name(), 0600, nil) + if err != nil { + t.Fatalf("error creating state db: %v", err) + } + upd := &MockTaskStateUpdater{} task := alloc.Job.TaskGroups[0].Tasks[0] // Initialize the port listing. This should be done by the offer process but @@ -106,7 +117,7 @@ func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocat vclient := vaultclient.NewMockVaultClient() cclient := newMockConsulServiceClient() - tr := NewTaskRunner(logger, conf, upd.Update, taskDir, alloc, task, vclient, cclient) + tr := NewTaskRunner(logger, conf, db, upd.Update, taskDir, alloc, task, vclient, cclient) if !restarts { tr.restartTracker = noRestartsTracker() } @@ -366,8 +377,8 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { } // Create a new task runner - task2 := &structs.Task{Name: ctx.tr.task.Name, Driver: ctx.tr.task.Driver} - tr2 := NewTaskRunner(ctx.tr.logger, ctx.tr.config, ctx.upd.Update, + task2 := &structs.Task{Name: ctx.tr.task.Name, Driver: ctx.tr.task.Driver, Vault: ctx.tr.task.Vault} + tr2 := NewTaskRunner(ctx.tr.logger, ctx.tr.config, ctx.tr.stateDB, ctx.upd.Update, ctx.tr.taskDir, ctx.tr.alloc, task2, ctx.tr.vaultClient, ctx.tr.consul) tr2.restartTracker = noRestartsTracker() if _, err := tr2.RestoreState(); err != nil { diff --git a/client/util.go b/client/util.go index 3f7cef981e7..32f765550c5 100644 --- a/client/util.go +++ b/client/util.go @@ -5,8 +5,6 @@ import ( "fmt" "io/ioutil" "math/rand" - "os" - "path/filepath" "github.com/hashicorp/nomad/nomad/structs" ) @@ -76,40 +74,12 @@ func shuffleStrings(list []string) { } } -// persistState is used to help with saving state -func persistState(path string, data interface{}) error { - buf, err := json.Marshal(data) - if err != nil { - return fmt.Errorf("failed to encode state: %v", err) - } - if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil { - return fmt.Errorf("failed to make dirs for %s: %v", path, err) - } - tmpPath := path + ".tmp" - if err := ioutil.WriteFile(tmpPath, buf, 0600); err != nil { - return fmt.Errorf("failed to save state to tmp: %v", err) - } - if err := os.Rename(tmpPath, path); err != nil { - return fmt.Errorf("failed to rename tmp to path: %v", err) - } - - // Sanity check since users have reported empty state files on disk - if stat, err := os.Stat(path); err != nil { - return fmt.Errorf("unable to stat state file %s: %v", path, err) - } else if stat.Size() == 0 { - return fmt.Errorf("persisted invalid state file %s; see https://github.com/hashicorp/nomad/issues/1367", path) - } - return nil -} - -// restoreState is used to read back in the persisted state -func restoreState(path string, data interface{}) error { +// pre060RestoreState is used to read back in the persisted state for pre v0.6.0 +// state +func pre060RestoreState(path string, data interface{}) error { buf, err := ioutil.ReadFile(path) if err != nil { - if os.IsNotExist(err) { - return nil - } - return fmt.Errorf("failed to read state: %v", err) + return err } if err := json.Unmarshal(buf, data); err != nil { return fmt.Errorf("failed to decode state: %v", err) diff --git a/client/util_test.go b/client/util_test.go index c0a8633c329..7e17220277c 100644 --- a/client/util_test.go +++ b/client/util_test.go @@ -1,9 +1,6 @@ package client import ( - "io/ioutil" - "os" - "path/filepath" "reflect" "testing" @@ -75,42 +72,3 @@ func TestShuffleStrings(t *testing.T) { t.Fatalf("shuffle failed") } } - -func TestPersistRestoreState(t *testing.T) { - t.Parallel() - dir, err := ioutil.TempDir("", "nomad") - if err != nil { - t.Fatalf("err: %s", err) - } - defer os.RemoveAll(dir) - - // Use a state path inside a non-existent directory. This - // verifies that the directory is created properly. - statePath := filepath.Join(dir, "subdir", "test-persist") - - type stateTest struct { - Foo int - Bar string - Baz bool - } - state := stateTest{ - Foo: 42, - Bar: "the quick brown fox", - Baz: true, - } - - err = persistState(statePath, &state) - if err != nil { - t.Fatalf("err: %v", err) - } - - var out stateTest - err = restoreState(statePath, &out) - if err != nil { - t.Fatalf("err: %v", err) - } - - if !reflect.DeepEqual(state, out) { - t.Fatalf("bad: %#v %#v", state, out) - } -} diff --git a/command/agent/consul/int_test.go b/command/agent/consul/int_test.go index fcc9678f811..82357c62a47 100644 --- a/command/agent/consul/int_test.go +++ b/command/agent/consul/int_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/boltdb/bolt" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/testutil" "github.com/hashicorp/nomad/client" @@ -75,6 +76,15 @@ func TestConsul_Integration(t *testing.T) { } defer os.RemoveAll(conf.AllocDir) + tmp, err := ioutil.TempFile("", "state-db") + if err != nil { + t.Fatalf("error creating state db file: %v", err) + } + db, err := bolt.Open(tmp.Name(), 0600, nil) + if err != nil { + t.Fatalf("error creating state db: %v", err) + } + alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] task.Driver = "mock_driver" @@ -135,7 +145,7 @@ func TestConsul_Integration(t *testing.T) { serviceClient.Run() close(consulRan) }() - tr := client.NewTaskRunner(logger, conf, logUpdate, taskDir, alloc, task, vclient, serviceClient) + tr := client.NewTaskRunner(logger, conf, db, logUpdate, taskDir, alloc, task, vclient, serviceClient) tr.MarkReceived() go tr.Run() defer func() { diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 7658e662874..ad6847c4e41 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -21,6 +21,7 @@ import ( "github.com/docker/docker/pkg/ioutils" "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hpcloud/tail/watch" "github.com/ugorji/go/codec" ) @@ -290,7 +291,7 @@ func NewStreamFramer(out io.WriteCloser, plainTxt bool, heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer { // Create a JSON encoder - enc := codec.NewEncoder(out, jsonHandle) + enc := codec.NewEncoder(out, structs.JsonHandle) // Create the heartbeat and flush ticker heartbeat := time.NewTicker(heartbeatRate) diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index 0e9b732547c..2feee179cdd 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/ugorji/go/codec" ) @@ -123,7 +124,7 @@ func TestStreamFramer_Flush(t *testing.T) { sf.Run() // Create a decoder - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) f := "foo" fe := "bar" @@ -191,7 +192,7 @@ func TestStreamFramer_Batch(t *testing.T) { sf.Run() // Create a decoder - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) f := "foo" fe := "bar" @@ -268,7 +269,7 @@ func TestStreamFramer_Heartbeat(t *testing.T) { sf.Run() // Create a decoder - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) // Start the reader resultCh := make(chan struct{}) @@ -320,7 +321,7 @@ func TestStreamFramer_Order(t *testing.T) { sf.Run() // Create a decoder - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) files := []string{"1", "2", "3", "4", "5"} input := bytes.NewBuffer(make([]byte, 0, 100000)) @@ -592,7 +593,7 @@ func TestHTTP_Stream_Modify(t *testing.T) { r, w := io.Pipe() defer r.Close() defer w.Close() - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) data := []byte("helloworld") @@ -668,7 +669,7 @@ func TestHTTP_Stream_Truncate(t *testing.T) { r, w := io.Pipe() defer r.Close() defer w.Close() - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) data := []byte("helloworld") @@ -778,7 +779,7 @@ func TestHTTP_Stream_Delete(t *testing.T) { wrappedW := &WriteCloseChecker{WriteCloser: w} defer r.Close() defer w.Close() - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) data := []byte("helloworld") @@ -869,7 +870,7 @@ func TestHTTP_Logs_NoFollow(t *testing.T) { wrappedW := &WriteCloseChecker{WriteCloser: w} defer r.Close() defer w.Close() - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) var received []byte @@ -955,7 +956,7 @@ func TestHTTP_Logs_Follow(t *testing.T) { wrappedW := &WriteCloseChecker{WriteCloser: w} defer r.Close() defer w.Close() - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) var received []byte @@ -1071,7 +1072,7 @@ func BenchmarkHTTP_Logs_Follow(t *testing.B) { wrappedW := &WriteCloseChecker{WriteCloser: w} defer r.Close() defer w.Close() - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) var received []byte diff --git a/command/agent/http.go b/command/agent/http.go index bdae5ee337c..d5c7e178985 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -29,18 +29,6 @@ const ( scadaHTTPAddr = "SCADA" ) -var ( - // jsonHandle and jsonHandlePretty are the codec handles to JSON encode - // structs. The pretty handle will add indents for easier human consumption. - jsonHandle = &codec.JsonHandle{ - HTMLCharsAsIs: true, - } - jsonHandlePretty = &codec.JsonHandle{ - HTMLCharsAsIs: true, - Indent: 4, - } -) - // HTTPServer is used to wrap an Agent and expose it over an HTTP interface type HTTPServer struct { agent *Agent @@ -186,6 +174,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) s.mux.HandleFunc("/debug/pprof/profile", pprof.Profile) s.mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + s.mux.HandleFunc("/debug/pprof/trace", pprof.Trace) } } @@ -248,13 +237,13 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque if obj != nil { var buf bytes.Buffer if prettyPrint { - enc := codec.NewEncoder(&buf, jsonHandlePretty) + enc := codec.NewEncoder(&buf, structs.JsonHandlePretty) err = enc.Encode(obj) if err == nil { buf.Write([]byte("\n")) } } else { - enc := codec.NewEncoder(&buf, jsonHandle) + enc := codec.NewEncoder(&buf, structs.JsonHandle) err = enc.Encode(obj) } if err != nil { diff --git a/demo/vagrant/README.md b/demo/vagrant/README.md index 2150799a488..112daa8e4f6 100644 Binary files a/demo/vagrant/README.md and b/demo/vagrant/README.md differ diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 924c39009e0..3826ed784fd 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4231,6 +4231,18 @@ var MsgpackHandle = func() *codec.MsgpackHandle { return h }() +var ( + // JsonHandle and JsonHandlePretty are the codec handles to JSON encode + // structs. The pretty handle will add indents for easier human consumption. + JsonHandle = &codec.JsonHandle{ + HTMLCharsAsIs: true, + } + JsonHandlePretty = &codec.JsonHandle{ + HTMLCharsAsIs: true, + Indent: 4, + } +) + var HashiMsgpackHandle = func() *hcodec.MsgpackHandle { h := &hcodec.MsgpackHandle{RawToString: true}