Skip to content

Commit

Permalink
Merge pull request #2610 from hashicorp/f-bolt-db
Browse files Browse the repository at this point in the history
Client persist state using bolt-db and more efficient write patterns
  • Loading branch information
dadgar authored May 9, 2017
2 parents bdaa2ba + 997390b commit d11ca57
Show file tree
Hide file tree
Showing 15 changed files with 777 additions and 234 deletions.
271 changes: 217 additions & 54 deletions client/alloc_runner.go

Large diffs are not rendered by default.

102 changes: 70 additions & 32 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"text/template"
"time"

"github.com/boltdb/bolt"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
Expand All @@ -36,13 +37,15 @@ func testAllocRunnerFromAlloc(alloc *structs.Allocation, restarts bool) (*MockAl
conf.Node = mock.Node()
conf.StateDir = os.TempDir()
conf.AllocDir = os.TempDir()
tmp, _ := ioutil.TempFile("", "state-db")
db, _ := bolt.Open(tmp.Name(), 0600, nil)
upd := &MockAllocStateUpdater{}
if !restarts {
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
alloc.Job.Type = structs.JobTypeBatch
}
vclient := vaultclient.NewMockVaultClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, newMockConsulServiceClient())
ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, newMockConsulServiceClient())
return upd, ar
}

Expand Down Expand Up @@ -171,9 +174,15 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
}

// Check the state still exists
if _, err := os.Stat(ar.stateFilePath()); err != nil {
return false, fmt.Errorf("state file destroyed: %v", err)
// Check the allocation state still exists
if err := ar.stateDB.View(func(tx *bolt.Tx) error {
if !allocationBucketExists(tx, ar.Alloc().ID) {
return fmt.Errorf("no bucket for alloc")
}

return nil
}); err != nil {
return false, fmt.Errorf("state destroyed")
}

// Check the alloc directory still exists
Expand Down Expand Up @@ -201,10 +210,14 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
}

// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
if err := ar.stateDB.View(func(tx *bolt.Tx) error {
if allocationBucketExists(tx, ar.Alloc().ID) {
return fmt.Errorf("bucket for alloc exists")
}

return nil
}); err != nil {
return false, fmt.Errorf("state not destroyed")
}

// Check the alloc directory was cleaned
Expand Down Expand Up @@ -249,10 +262,14 @@ func TestAllocRunner_Destroy(t *testing.T) {
}

// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
if err := ar.stateDB.View(func(tx *bolt.Tx) error {
if allocationBucketExists(tx, ar.Alloc().ID) {
return fmt.Errorf("bucket for alloc exists")
}

return nil
}); err != nil {
return false, fmt.Errorf("state not destroyed")
}

// Check the alloc directory was cleaned
Expand Down Expand Up @@ -324,7 +341,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {

// Create a new alloc runner
l2 := prefixedTestLogger("----- ar2: ")
ar2 := NewAllocRunner(l2, ar.config, upd.Update,
ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient,
ar.consulClient)
err = ar2.RestoreState()
Expand Down Expand Up @@ -368,12 +385,10 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
}

func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
ar.logger = prefixedTestLogger("ar1: ")

// Ensure task takes some time

ar.alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["run_for"] = "10s"
Expand Down Expand Up @@ -410,14 +425,14 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
}

// Ensure ar1 doesn't recreate the state file
ar.persistLock.Lock()
defer ar.persistLock.Unlock()
ar.allocLock.Lock()
defer ar.allocLock.Unlock()

// Ensure both alloc runners don't destroy
ar.destroy = true

// Create a new alloc runner
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
ar2 := NewAllocRunner(ar.logger, ar.config, ar.stateDB, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient)
ar2.logger = prefixedTestLogger("ar2: ")
err = ar2.RestoreState()
Expand All @@ -429,8 +444,14 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {

testutil.WaitForResult(func() (bool, error) {
// Check the state still exists
if _, err := os.Stat(ar.stateFilePath()); err != nil {
return false, fmt.Errorf("state file destroyed: %v", err)
if err := ar.stateDB.View(func(tx *bolt.Tx) error {
if !allocationBucketExists(tx, ar2.Alloc().ID) {
return fmt.Errorf("no bucket for alloc")
}

return nil
}); err != nil {
return false, fmt.Errorf("state destroyed")
}

// Check the alloc directory still exists
Expand Down Expand Up @@ -459,10 +480,14 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
}

// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
if err := ar.stateDB.View(func(tx *bolt.Tx) error {
if allocationBucketExists(tx, ar2.Alloc().ID) {
return fmt.Errorf("bucket for alloc exists")
}

return nil
}); err != nil {
return false, fmt.Errorf("state not destroyed")
}

// Check the alloc directory was cleaned
Expand Down Expand Up @@ -497,7 +522,14 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) {

// Snapshot state
testutil.WaitForResult(func() (bool, error) {
return len(ar.tasks) == 1, nil
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
t.Fatalf("task never started: %v", err)
})
Expand All @@ -509,9 +541,7 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) {

// Create a new alloc runner
l2 := prefixedTestLogger("----- ar2: ")
ar2 := NewAllocRunner(l2, origConfig, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient,
ar.consulClient)
ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient)
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
Expand All @@ -527,14 +557,14 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) {
return false, nil
}

for _, ev := range ar2.alloc.TaskStates["web"].Events {
for _, ev := range ar2.taskStates["web"].Events {
if strings.HasSuffix(ev.RestartReason, pre06ScriptCheckReason) {
return true, nil
}
}
return false, fmt.Errorf("no restart with proper reason found")
}, func(err error) {
t.Fatalf("err: %v\nAllocs: %#v\nWeb State: %#v", err, upd.Allocs, ar2.alloc.TaskStates["web"])
t.Fatalf("err: %v\nAllocs: %#v\nWeb State: %#v", err, upd.Allocs, ar2.taskStates["web"])
})

// Destroy and wait
Expand Down Expand Up @@ -584,6 +614,14 @@ func TestAllocRunner_RestoreOldState(t *testing.T) {
conf := config.DefaultConfig()
conf.StateDir = os.TempDir()
conf.AllocDir = os.TempDir()
tmp, err := ioutil.TempFile("", "state-db")
if err != nil {
t.Fatalf("error creating state db file: %v", err)
}
db, err := bolt.Open(tmp.Name(), 0600, nil)
if err != nil {
t.Fatalf("error creating state db: %v", err)
}

if err := os.MkdirAll(filepath.Join(conf.StateDir, "alloc", alloc.ID), 0777); err != nil {
t.Fatalf("error creating state dir: %v", err)
Expand Down Expand Up @@ -655,7 +693,7 @@ func TestAllocRunner_RestoreOldState(t *testing.T) {
alloc.Job.Type = structs.JobTypeBatch
vclient := vaultclient.NewMockVaultClient()
cclient := newMockConsulServiceClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, cclient)
ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, cclient)
defer ar.Destroy()

// RestoreState should fail on the task state since we only test the
Expand All @@ -671,7 +709,7 @@ func TestAllocRunner_RestoreOldState(t *testing.T) {
if len(merr.Errors) != 1 {
t.Fatalf("expected exactly 1 error from RestoreState but found: %d: %v", len(merr.Errors), err)
}
if expected := "task runner snapshot includes nil Task"; merr.Errors[0].Error() != expected {
if expected := "failed to get task bucket"; !strings.Contains(merr.Errors[0].Error(), expected) {
t.Fatalf("expected %q but got: %q", expected, merr.Errors[0].Error())
}

Expand Down
Loading

0 comments on commit d11ca57

Please sign in to comment.