Skip to content

Commit

Permalink
Merge branch 'master' of github.com:hashicorp/nomad
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed May 11, 2017
2 parents 458c3c5 + 0762f9c commit f36371a
Show file tree
Hide file tree
Showing 10 changed files with 1,535 additions and 36 deletions.
102 changes: 102 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
PeriodicLaunchSnapshot
JobSummarySnapshot
VaultAccessorSnapshot
JobVersionSnapshot
DeploymentSnapshot
)

// nomadFSM implements a finite state machine that is used
Expand Down Expand Up @@ -153,6 +155,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyUpsertVaultAccessor(buf[1:], log.Index)
case structs.VaultAccessorDegisterRequestType:
return n.applyDeregisterVaultAccessor(buf[1:], log.Index)
case structs.ApplyPlanResultsRequestType:
return n.applyPlanResults(buf[1:], log.Index)
default:
if ignoreUnknown {
n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
Expand Down Expand Up @@ -543,6 +547,22 @@ func (n *nomadFSM) applyDeregisterVaultAccessor(buf []byte, index uint64) interf
return nil
}

// applyPlanApply applies the results of a plan application
func (n *nomadFSM) applyPlanResults(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_plan_results"}, time.Now())
var req structs.ApplyPlanResultsRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.UpsertPlanResults(index, &req); err != nil {
n.logger.Printf("[ERR] nomad.fsm: ApplyPlan failed: %v", err)
return err
}

return nil
}

func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
// Create a new snapshot
snap, err := n.state.Snapshot()
Expand Down Expand Up @@ -680,6 +700,24 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return err
}

case JobVersionSnapshot:
version := new(structs.Job)
if err := dec.Decode(version); err != nil {
return err
}
if err := restore.JobVersionRestore(version); err != nil {
return err
}

case DeploymentSnapshot:
deployment := new(structs.Deployment)
if err := dec.Decode(deployment); err != nil {
return err
}
if err := restore.DeploymentRestore(deployment); err != nil {
return err
}

default:
return fmt.Errorf("Unrecognized snapshot type: %v", msgType)
}
Expand Down Expand Up @@ -878,6 +916,14 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel()
return err
}
if err := s.persistJobVersions(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistDeployments(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil
}

Expand Down Expand Up @@ -1098,6 +1144,62 @@ func (s *nomadSnapshot) persistVaultAccessors(sink raft.SnapshotSink,
return nil
}

func (s *nomadSnapshot) persistJobVersions(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the jobs
ws := memdb.NewWatchSet()
versions, err := s.snap.JobVersions(ws)
if err != nil {
return err
}

for {
// Get the next item
raw := versions.Next()
if raw == nil {
break
}

// Prepare the request struct
job := raw.(*structs.Job)

// Write out a job registration
sink.Write([]byte{byte(JobVersionSnapshot)})
if err := encoder.Encode(job); err != nil {
return err
}
}
return nil
}

func (s *nomadSnapshot) persistDeployments(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the jobs
ws := memdb.NewWatchSet()
deployments, err := s.snap.Deployments(ws)
if err != nil {
return err
}

for {
// Get the next item
raw := deployments.Next()
if raw == nil {
break
}

// Prepare the request struct
deployment := raw.(*structs.Deployment)

// Write out a job registration
sink.Write([]byte{byte(DeploymentSnapshot)})
if err := encoder.Encode(deployment); err != nil {
return err
}
}
return nil
}

// Release is a no-op, as we just need to GC the pointer
// to the state store snapshot. There is nothing to explicitly
// cleanup.
Expand Down
143 changes: 143 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,56 @@ func TestFSM_SnapshotRestore_VaultAccessors(t *testing.T) {
}
}

func TestFSM_SnapshotRestore_JobVersions(t *testing.T) {
// Add some state
fsm := testFSM(t)
state := fsm.State()
job1 := mock.Job()
state.UpsertJob(1000, job1)
job2 := mock.Job()
job2.ID = job1.ID
state.UpsertJob(1001, job2)

// Verify the contents
ws := memdb.NewWatchSet()
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out1, _ := state2.JobByIDAndVersion(ws, job1.ID, job1.Version)
out2, _ := state2.JobByIDAndVersion(ws, job2.ID, job2.Version)
if !reflect.DeepEqual(job1, out1) {
t.Fatalf("bad: \n%#v\n%#v", out1, job1)
}
if !reflect.DeepEqual(job2, out2) {
t.Fatalf("bad: \n%#v\n%#v", out2, job2)
}
if job2.Version != 1 {
t.Fatalf("bad: \n%#v\n%#v", 1, job2)
}
}

func TestFSM_SnapshotRestore_Deployments(t *testing.T) {
// Add some state
fsm := testFSM(t)
state := fsm.State()
d1 := mock.Deployment()
d2 := mock.Deployment()
state.UpsertDeployment(1000, d1, false)
state.UpsertDeployment(1001, d2, false)

// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
ws := memdb.NewWatchSet()
out1, _ := state2.DeploymentByID(ws, d1.ID)
out2, _ := state2.DeploymentByID(ws, d2.ID)
if !reflect.DeepEqual(d1, out1) {
t.Fatalf("bad: \n%#v\n%#v", out1, d1)
}
if !reflect.DeepEqual(d2, out2) {
t.Fatalf("bad: \n%#v\n%#v", out2, d2)
}
}

func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
// Add some state
fsm := testFSM(t)
Expand Down Expand Up @@ -1434,3 +1484,96 @@ func TestFSM_ReconcileSummaries(t *testing.T) {
t.Fatalf("expected: %#v, actual: %#v", &expected, out2)
}
}

func TestFSM_ApplyPlanResults(t *testing.T) {
fsm := testFSM(t)

// Create the request and create a deployment
alloc := mock.Alloc()
job := alloc.Job
alloc.Job = nil

d := mock.Deployment()
d.JobID = job.ID
d.JobModifyIndex = job.ModifyIndex
d.JobVersion = job.Version

alloc.DeploymentID = d.ID

fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: job,
Alloc: []*structs.Allocation{alloc},
},
CreatedDeployment: d,
}
buf, err := structs.Encode(structs.ApplyPlanResultsRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}

// Verify the allocation is registered
ws := memdb.NewWatchSet()
out, err := fsm.State().AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
alloc.CreateIndex = out.CreateIndex
alloc.ModifyIndex = out.ModifyIndex
alloc.AllocModifyIndex = out.AllocModifyIndex

// Job should be re-attached
alloc.Job = job
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("bad: %#v %#v", alloc, out)
}

dout, err := fsm.State().DeploymentByID(ws, d.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if tg, ok := dout.TaskGroups[alloc.TaskGroup]; !ok || tg.PlacedAllocs != 1 {
t.Fatalf("err: %v %v", tg, err)
}

// Ensure that the original job is used
evictAlloc := alloc.Copy()
job = mock.Job()
job.Priority = 123

evictAlloc.Job = nil
evictAlloc.DesiredStatus = structs.AllocDesiredStatusEvict
req2 := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: job,
Alloc: []*structs.Allocation{evictAlloc},
},
}
buf, err = structs.Encode(structs.ApplyPlanResultsRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}

resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}

// Verify we are evicted
out, err = fsm.State().AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out.DesiredStatus != structs.AllocDesiredStatusEvict {
t.Fatalf("alloc found!")
}
if out.Job == nil || out.Job.Priority == 123 {
t.Fatalf("bad job")
}
}
2 changes: 1 addition & 1 deletion nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func (j *Job) GetJobVersions(args *structs.JobSpecificRequest,
reply.Index = out[0].ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := state.Index("job_versions")
index, err := state.Index("job_version")
if err != nil {
return err
}
Expand Down
19 changes: 19 additions & 0 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,25 @@ func VaultAccessor() *structs.VaultAccessor {
}
}

func Deployment() *structs.Deployment {
return &structs.Deployment{
ID: structs.GenerateUUID(),
JobID: structs.GenerateUUID(),
JobVersion: 2,
JobModifyIndex: 20,
JobCreateIndex: 18,
TaskGroups: map[string]*structs.DeploymentState{
"web": &structs.DeploymentState{
DesiredTotal: 10,
},
},
Status: structs.DeploymentStatusRunning,
StatusDescription: structs.DeploymentStatusRunning,
ModifyIndex: 23,
CreateIndex: 21,
}
}

func Plan() *structs.Plan {
return &structs.Plan{
Priority: 50,
Expand Down
26 changes: 14 additions & 12 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *Server) planApply() {
}

// Dispatch the Raft transaction for the plan
future, err := s.applyPlan(pending.plan.Job, result, snap)
future, err := s.applyPlan(pending.plan, result, snap)
if err != nil {
s.logger.Printf("[ERR] nomad: failed to submit plan: %v", err)
pending.respond(nil, err)
Expand All @@ -120,16 +120,23 @@ func (s *Server) planApply() {
}

// applyPlan is used to apply the plan result and to return the alloc index
func (s *Server) applyPlan(job *structs.Job, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) {
func (s *Server) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) {
// Determine the miniumum number of updates, could be more if there
// are multiple updates per node
minUpdates := len(result.NodeUpdate)
minUpdates += len(result.NodeAllocation)

// Grab the job
job := plan.Job

// Setup the update request
req := structs.AllocUpdateRequest{
Job: job,
Alloc: make([]*structs.Allocation, 0, minUpdates),
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: job,
Alloc: make([]*structs.Allocation, 0, minUpdates),
},
CreatedDeployment: plan.CreatedDeployment,
DeploymentUpdates: plan.DeploymentUpdates,
}
for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
Expand All @@ -148,20 +155,15 @@ func (s *Server) applyPlan(job *structs.Job, result *structs.PlanResult, snap *s
}

// Dispatch the Raft transaction
future, err := s.raftApplyFuture(structs.AllocUpdateRequestType, &req)
future, err := s.raftApplyFuture(structs.ApplyPlanResultsRequestType, &req)
if err != nil {
return nil, err
}

// Optimistically apply to our state view
if snap != nil {
// Attach the job to all the allocations. It is pulled out in the
// payload to avoid the redundancy of encoding, but should be denormalized
// prior to being inserted into MemDB.
structs.DenormalizeAllocationJobs(req.Job, req.Alloc)

nextIdx := s.raft.AppliedIndex() + 1
if err := snap.UpsertAllocs(nextIdx, req.Alloc); err != nil {
if err := snap.UpsertPlanResults(nextIdx, &req); err != nil {
return future, err
}
}
Expand Down
Loading

0 comments on commit f36371a

Please sign in to comment.