Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client persist state using bolt-db and more efficient write patterns #2610

Merged
merged 14 commits into from
May 9, 2017
234 changes: 189 additions & 45 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/boltdb/bolt"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
Expand All @@ -27,6 +28,13 @@ const (
taskReceivedSyncLimit = 30 * time.Second
)

var (
// The following are the key paths written to the state database
allocRunnerStateImmutableKey = []byte("immutable")
allocRunnerStateMutableKey = []byte("mutable")
allocRunnerStateAllocDirKey = []byte("alloc-dir")
)

// AllocStateUpdater is used to update the status of an allocation
type AllocStateUpdater func(alloc *structs.Allocation)

Expand Down Expand Up @@ -69,10 +77,18 @@ type AllocRunner struct {
destroyLock sync.Mutex
waitCh chan struct{}

// serialize saveAllocRunnerState calls
persistLock sync.Mutex
// State related fields
// stateDB is used to store the alloc runners state
stateDB *bolt.DB

// immutablePersisted and allocDirPersisted are used to track whether the
// immutable data and the alloc dir have been persisted. Once persisted we
// can lower write volume by not re-writing these values
immutablePersisted bool
allocDirPersisted bool
}

// COMPAT: Remove in 0.7.0
// allocRunnerState is used to snapshot the state of the alloc runner
type allocRunnerState struct {
Version string
Expand All @@ -95,13 +111,29 @@ type allocRunnerState struct {
} `json:"Context,omitempty"`
}

// allocRunnerImmutableState is state that only has to be written once as it
// doesn't change over the life-cycle of the alloc_runner.
type allocRunnerImmutableState struct {
Version string
Alloc *structs.Allocation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r.alloc is mutable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have pulled the mutable parts of it into the allocRunnerMutableState. You can think of this one as the allocation received by the client initially

}

// allocRunnerMutableState is state that has to be written on each save as it
// changes over the life-cycle of the alloc_runner.
type allocRunnerMutableState struct {
AllocClientStatus string
AllocClientDescription string
TaskStates map[string]*structs.TaskState
}

// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater,
func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater,
alloc *structs.Allocation, vaultClient vaultclient.VaultClient,
consulClient ConsulServiceAPI) *AllocRunner {

ar := &AllocRunner{
config: config,
stateDB: stateDB,
updater: updater,
logger: logger,
alloc: alloc,
Expand All @@ -118,8 +150,10 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStat
return ar
}

// stateFilePath returns the path to our state file
func (r *AllocRunner) stateFilePath() string {
// pre060StateFilePath returns the path to our state file that would have been
// written pre v0.6.0
// COMPAT: Remove in 0.7.0
func (r *AllocRunner) pre060StateFilePath() string {
r.allocLock.Lock()
defer r.allocLock.Unlock()
path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json")
Expand All @@ -128,28 +162,74 @@ func (r *AllocRunner) stateFilePath() string {

// RestoreState is used to restore the state of the alloc runner
func (r *AllocRunner) RestoreState() error {
// Load the snapshot

// Check if the old snapshot is there
oldPath := r.pre060StateFilePath()
var snap allocRunnerState
if err := restoreState(r.stateFilePath(), &snap); err != nil {
if err := pre060RestoreState(oldPath, &snap); err == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check for the new state location first and only fallback if it doesn't exist to avoid races where we delete the old state files and crash before writing the new version.

// Restore fields
r.logger.Printf("[DEBUG] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.alloc.ID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] I wonder if we should elevate this to [INFO] since users should only see it once-per-alloc when upgrading and won't have an opportunity to bump their log level from INFO to DEBUG if they need this log line to debug something after-the-fact. (Although whether or not this code was hit should be apparent from other factors as well - so w/e)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

r.alloc = snap.Alloc
r.allocDir = snap.AllocDir
r.allocClientStatus = snap.AllocClientStatus
r.allocClientDescription = snap.AllocClientDescription

if r.alloc != nil {
r.taskStates = snap.Alloc.TaskStates
}

// #2132 Upgrade path: if snap.AllocDir is nil, try to convert old
// Context struct to new AllocDir struct
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, this migration was added in the 0.5.x release cycle and therefore can be removed in 0.7. If you're in this code again mind adding // TODO Remove in 0.7 for me? Sorry!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

if snap.AllocDir == nil && snap.Context != nil {
r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID)
r.allocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir)
for taskName := range snap.Context.AllocDir.TaskDirs {
r.allocDir.NewTaskDir(taskName)
}
}

// Delete the old state
os.RemoveAll(oldPath)
} else if !os.IsNotExist(err) {
// Something corrupt in the old state file
return err
}
} else {
// We are doing a normal restore
err := r.stateDB.View(func(tx *bolt.Tx) error {
bkt, err := getAllocationBucket(tx, r.alloc.ID)
if err != nil {
return fmt.Errorf("failed to get allocation bucket: %v", err)
}

// Get the state objects
var mutable allocRunnerMutableState
var immutable allocRunnerImmutableState
var allocDir allocdir.AllocDir

if err := getObject(bkt, allocRunnerStateImmutableKey, &immutable); err != nil {
return fmt.Errorf("failed to read alloc runner immutable state: %v", err)
}
if err := getObject(bkt, allocRunnerStateMutableKey, &mutable); err != nil {
return fmt.Errorf("failed to read alloc runner mutable state: %v", err)
}
if err := getObject(bkt, allocRunnerStateAllocDirKey, &allocDir); err != nil {
return fmt.Errorf("failed to read alloc runner alloc_dir state: %v", err)
}

// #2132 Upgrade path: if snap.AllocDir is nil, try to convert old
// Context struct to new AllocDir struct
if snap.AllocDir == nil && snap.Context != nil {
r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID)
snap.AllocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir)
for taskName := range snap.Context.AllocDir.TaskDirs {
snap.AllocDir.NewTaskDir(taskName)
// Populate the fields
r.alloc = immutable.Alloc
r.allocDir = &allocDir
r.allocClientStatus = mutable.AllocClientStatus
r.allocClientDescription = mutable.AllocClientDescription
r.taskStates = mutable.TaskStates
return nil
})

if err != nil {
return fmt.Errorf("failed to read allocation state: %v", err)
}
}

// Restore fields
r.alloc = snap.Alloc
r.allocDir = snap.AllocDir
r.allocClientStatus = snap.AllocClientStatus
r.allocClientDescription = snap.AllocClientDescription

var snapshotErrors multierror.Error
if r.alloc == nil {
snapshotErrors.Errors = append(snapshotErrors.Errors, fmt.Errorf("alloc_runner snapshot includes a nil allocation"))
Expand All @@ -161,11 +241,17 @@ func (r *AllocRunner) RestoreState() error {
return e
}

r.taskStates = snap.Alloc.TaskStates
tg := r.alloc.Job.LookupTaskGroup(r.alloc.TaskGroup)
if tg == nil {
return fmt.Errorf("restored allocation doesn't contain task group %q", r.alloc.TaskGroup)
}

// Restore the task runners
var mErr multierror.Error
for name, state := range r.taskStates {
for _, task := range tg.Tasks {
name := task.Name
state := r.taskStates[name]

// Mark the task as restored.
r.restored[name] = struct{}{}

Expand All @@ -177,8 +263,7 @@ func (r *AllocRunner) RestoreState() error {
return err
}

task := &structs.Task{Name: name}
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
r.tasks[name] = tr

// Skip tasks in terminal states.
Expand All @@ -198,11 +283,6 @@ func (r *AllocRunner) RestoreState() error {
return mErr.ErrorOrNil()
}

// GetAllocDir returns the alloc dir for the alloc runner
func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir {
return r.allocDir
}

// SaveState is used to snapshot the state of the alloc runner
// if the fullSync is marked as false only the state of the Alloc Runner
// is snapshotted. If fullSync is marked as true, we snapshot
Expand All @@ -224,10 +304,7 @@ func (r *AllocRunner) SaveState() error {
}

func (r *AllocRunner) saveAllocRunnerState() error {
r.persistLock.Lock()
defer r.persistLock.Unlock()

// Create the snapshot.
// Grab all the relevant data
alloc := r.Alloc()

r.allocLock.Lock()
Expand All @@ -239,14 +316,55 @@ func (r *AllocRunner) saveAllocRunnerState() error {
allocDir := r.allocDir
r.allocDirLock.Unlock()

snap := allocRunnerState{
Version: r.config.Version,
Alloc: alloc,
AllocDir: allocDir,
AllocClientStatus: allocClientStatus,
AllocClientDescription: allocClientDescription,
}
return persistState(r.stateFilePath(), &snap)
// Start the transaction.
return r.stateDB.Batch(func(tx *bolt.Tx) error {

// Grab the allocation bucket
allocBkt, err := getAllocationBucket(tx, r.alloc.ID)
if err != nil {
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
}

// Write the immutable data
if !r.immutablePersisted {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will skip saving updated allocs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See earlier comment. This is how we get a lot of the efficiency gain.

immutable := &allocRunnerImmutableState{
Alloc: alloc,
Version: r.config.Version,
}

if err := putObject(allocBkt, allocRunnerStateImmutableKey, &immutable); err != nil {
return fmt.Errorf("failed to write alloc_runner immutable state: %v", err)
}

tx.OnCommit(func() {
r.immutablePersisted = true
})
}

// Write the alloc dir data if it hasn't been written before and it exists.
if !r.allocDirPersisted && r.allocDir != nil {
if err := putObject(allocBkt, allocRunnerStateAllocDirKey, allocDir); err != nil {
return fmt.Errorf("failed to write alloc_runner allocDir state: %v", err)
}

tx.OnCommit(func() {
r.allocDirPersisted = true
})
}

// Write the mutable state every time
mutable := &allocRunnerMutableState{
AllocClientStatus: allocClientStatus,
AllocClientDescription: allocClientDescription,
TaskStates: alloc.TaskStates,
}

if err := putObject(allocBkt, allocRunnerStateMutableKey, &mutable); err != nil {
return fmt.Errorf("failed to write alloc_runner mutable state: %v", err)
}

return nil
})
}

func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {
Expand All @@ -259,14 +377,24 @@ func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {

// DestroyState is used to cleanup after ourselves
func (r *AllocRunner) DestroyState() error {
return os.RemoveAll(filepath.Dir(r.stateFilePath()))
return r.stateDB.Update(func(tx *bolt.Tx) error {
if err := deleteAllocationBucket(tx, r.alloc.ID); err != nil {
return fmt.Errorf("failed to delete allocation bucket: %v", err)
}
return nil
})
}

// DestroyContext is used to destroy the context
func (r *AllocRunner) DestroyContext() error {
return r.allocDir.Destroy()
}

// GetAllocDir returns the alloc dir for the alloc runner
func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you just moved this, but it made me look around. It seems this doesn't need exporting, and that it should acquire the AllocRunner.allocDirLock. We seem to hold that lock inconsistently though (see DestroyContext above), and the various alloc*Locks make me scared we handle AllocRunner state non-atomically.

Feel free to consider out of scope for this PR as I don't think you've altered this particular locking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used by the client for the method: GetAllocFS which is used for the FS commands.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly that lock should be removed. It doesn't really protect much.

return r.allocDir
}

// copyTaskStates returns a copy of the passed task states.
func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.TaskState {
copy := make(map[string]*structs.TaskState, len(states))
Expand All @@ -279,8 +407,20 @@ func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.Ta
// Alloc returns the associated allocation
func (r *AllocRunner) Alloc() *structs.Allocation {
r.allocLock.Lock()

// Clear the job before copying
job := r.alloc.Job

// Since we are clearing the job, anything that access the alloc.Job field
// must acquire the lock or access it via this method.
r.alloc.Job = nil

alloc := r.alloc.Copy()

// Restore
r.alloc.Job = job
alloc.Job = job

// The status has explicitly been set.
if r.allocClientStatus != "" || r.allocClientDescription != "" {
alloc.ClientStatus = r.allocClientStatus
Expand Down Expand Up @@ -463,7 +603,7 @@ func (r *AllocRunner) Run() {
go r.dirtySyncState()

// Find the task group to run in the allocation
alloc := r.alloc
alloc := r.Alloc()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
Expand Down Expand Up @@ -516,7 +656,7 @@ func (r *AllocRunner) Run() {
taskdir := r.allocDir.NewTaskDir(task.Name)
r.allocDirLock.Unlock()

tr := NewTaskRunner(r.logger, r.config, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient)
tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient)
r.tasks[task.Name] = tr
tr.MarkReceived()

Expand Down Expand Up @@ -549,6 +689,10 @@ OUTER:
for _, tr := range runners {
tr.Update(update)
}

if err := r.syncStatus(); err != nil {
r.logger.Printf("[WARN] client: failed to sync status upon receiving alloc update: %v", err)
}
case <-r.destroyCh:
taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled)
break OUTER
Expand Down
Loading