Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client persist state using bolt-db and more efficient write patterns #2610

Merged
merged 14 commits into from
May 9, 2017
271 changes: 217 additions & 54 deletions client/alloc_runner.go

Large diffs are not rendered by default.

102 changes: 70 additions & 32 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"text/template"
"time"

"github.com/boltdb/bolt"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
Expand All @@ -36,13 +37,15 @@ func testAllocRunnerFromAlloc(alloc *structs.Allocation, restarts bool) (*MockAl
conf.Node = mock.Node()
conf.StateDir = os.TempDir()
conf.AllocDir = os.TempDir()
tmp, _ := ioutil.TempFile("", "state-db")
db, _ := bolt.Open(tmp.Name(), 0600, nil)
upd := &MockAllocStateUpdater{}
if !restarts {
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
alloc.Job.Type = structs.JobTypeBatch
}
vclient := vaultclient.NewMockVaultClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, newMockConsulServiceClient())
ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, newMockConsulServiceClient())
return upd, ar
}

Expand Down Expand Up @@ -171,9 +174,15 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
}

// Check the state still exists
if _, err := os.Stat(ar.stateFilePath()); err != nil {
return false, fmt.Errorf("state file destroyed: %v", err)
// Check the allocation state still exists
if err := ar.stateDB.View(func(tx *bolt.Tx) error {
if !allocationBucketExists(tx, ar.Alloc().ID) {
return fmt.Errorf("no bucket for alloc")
}

return nil
}); err != nil {
return false, fmt.Errorf("state destroyed")
}

// Check the alloc directory still exists
Expand Down Expand Up @@ -201,10 +210,14 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
}

// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
if err := ar.stateDB.View(func(tx *bolt.Tx) error {
if allocationBucketExists(tx, ar.Alloc().ID) {
return fmt.Errorf("bucket for alloc exists")
}

return nil
}); err != nil {
return false, fmt.Errorf("state not destroyed")
}

// Check the alloc directory was cleaned
Expand Down Expand Up @@ -249,10 +262,14 @@ func TestAllocRunner_Destroy(t *testing.T) {
}

// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
if err := ar.stateDB.View(func(tx *bolt.Tx) error {
if allocationBucketExists(tx, ar.Alloc().ID) {
return fmt.Errorf("bucket for alloc exists")
}

return nil
}); err != nil {
return false, fmt.Errorf("state not destroyed")
}

// Check the alloc directory was cleaned
Expand Down Expand Up @@ -324,7 +341,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {

// Create a new alloc runner
l2 := prefixedTestLogger("----- ar2: ")
ar2 := NewAllocRunner(l2, ar.config, upd.Update,
ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient,
ar.consulClient)
err = ar2.RestoreState()
Expand Down Expand Up @@ -368,12 +385,10 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
}

func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
ar.logger = prefixedTestLogger("ar1: ")

// Ensure task takes some time

ar.alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["run_for"] = "10s"
Expand Down Expand Up @@ -410,14 +425,14 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
}

// Ensure ar1 doesn't recreate the state file
ar.persistLock.Lock()
defer ar.persistLock.Unlock()
ar.allocLock.Lock()
defer ar.allocLock.Unlock()

// Ensure both alloc runners don't destroy
ar.destroy = true

// Create a new alloc runner
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
ar2 := NewAllocRunner(ar.logger, ar.config, ar.stateDB, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient)
ar2.logger = prefixedTestLogger("ar2: ")
err = ar2.RestoreState()
Expand All @@ -429,8 +444,14 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {

testutil.WaitForResult(func() (bool, error) {
// Check the state still exists
if _, err := os.Stat(ar.stateFilePath()); err != nil {
return false, fmt.Errorf("state file destroyed: %v", err)
if err := ar.stateDB.View(func(tx *bolt.Tx) error {
if !allocationBucketExists(tx, ar2.Alloc().ID) {
return fmt.Errorf("no bucket for alloc")
}

return nil
}); err != nil {
return false, fmt.Errorf("state destroyed")
}

// Check the alloc directory still exists
Expand Down Expand Up @@ -459,10 +480,14 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
}

// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
if err := ar.stateDB.View(func(tx *bolt.Tx) error {
if allocationBucketExists(tx, ar2.Alloc().ID) {
return fmt.Errorf("bucket for alloc exists")
}

return nil
}); err != nil {
return false, fmt.Errorf("state not destroyed")
}

// Check the alloc directory was cleaned
Expand Down Expand Up @@ -497,7 +522,14 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) {

// Snapshot state
testutil.WaitForResult(func() (bool, error) {
return len(ar.tasks) == 1, nil
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
t.Fatalf("task never started: %v", err)
})
Expand All @@ -509,9 +541,7 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) {

// Create a new alloc runner
l2 := prefixedTestLogger("----- ar2: ")
ar2 := NewAllocRunner(l2, origConfig, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient,
ar.consulClient)
ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient)
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
Expand All @@ -527,14 +557,14 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) {
return false, nil
}

for _, ev := range ar2.alloc.TaskStates["web"].Events {
for _, ev := range ar2.taskStates["web"].Events {
if strings.HasSuffix(ev.RestartReason, pre06ScriptCheckReason) {
return true, nil
}
}
return false, fmt.Errorf("no restart with proper reason found")
}, func(err error) {
t.Fatalf("err: %v\nAllocs: %#v\nWeb State: %#v", err, upd.Allocs, ar2.alloc.TaskStates["web"])
t.Fatalf("err: %v\nAllocs: %#v\nWeb State: %#v", err, upd.Allocs, ar2.taskStates["web"])
})

// Destroy and wait
Expand Down Expand Up @@ -584,6 +614,14 @@ func TestAllocRunner_RestoreOldState(t *testing.T) {
conf := config.DefaultConfig()
conf.StateDir = os.TempDir()
conf.AllocDir = os.TempDir()
tmp, err := ioutil.TempFile("", "state-db")
if err != nil {
t.Fatalf("error creating state db file: %v", err)
}
db, err := bolt.Open(tmp.Name(), 0600, nil)
if err != nil {
t.Fatalf("error creating state db: %v", err)
}

if err := os.MkdirAll(filepath.Join(conf.StateDir, "alloc", alloc.ID), 0777); err != nil {
t.Fatalf("error creating state dir: %v", err)
Expand Down Expand Up @@ -655,7 +693,7 @@ func TestAllocRunner_RestoreOldState(t *testing.T) {
alloc.Job.Type = structs.JobTypeBatch
vclient := vaultclient.NewMockVaultClient()
cclient := newMockConsulServiceClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, cclient)
ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, cclient)
defer ar.Destroy()

// RestoreState should fail on the task state since we only test the
Expand All @@ -671,7 +709,7 @@ func TestAllocRunner_RestoreOldState(t *testing.T) {
if len(merr.Errors) != 1 {
t.Fatalf("expected exactly 1 error from RestoreState but found: %d: %v", len(merr.Errors), err)
}
if expected := "task runner snapshot includes nil Task"; merr.Errors[0].Error() != expected {
if expected := "failed to get task bucket"; !strings.Contains(merr.Errors[0].Error(), expected) {
t.Fatalf("expected %q but got: %q", expected, merr.Errors[0].Error())
}

Expand Down
Loading