Skip to content

Commit

Permalink
Clean up leaked deployments on restoration
Browse files Browse the repository at this point in the history
This PR cancels deployments that are active but do not have a job
associated with them. This is a broken invariant that causes issues in
the deployment watcher since it will not track them. Thus they are
objects that can't be operated on or cleaned up.

Fixes #4286
  • Loading branch information
dadgar committed May 23, 2018
1 parent 7b18e44 commit 689d396
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 0 deletions.
59 changes: 59 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,12 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
}
}

// COMPAT Remove in 0.10
// Clean up active deployments that do not have a job
if err := n.failLeakedDeployments(newState); err != nil {
return err
}

// External code might be calling State(), so we need to synchronize
// here to make sure we swap in the new state store atomically.
n.stateLock.Lock()
Expand All @@ -1276,6 +1282,59 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return nil
}

// failLeakedDeployments is used to fail deployments that do not have a job.
// This state is a broken invariant that should not occur since 0.8.X.
func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error {
// Scan for deployments that are referencing a job that no longer exists.
// This could happen if multiple deployments were created for a given job
// and thus the older deployment leaks and then the job is removed.
iter, err := state.Deployments(nil)
if err != nil {
return fmt.Errorf("failed to query deployments: %v", err)
}

dindex, err := state.Index("deployment")
if err != nil {
return fmt.Errorf("couldn't fetch index of deployments table: %v", err)
}

for {
raw := iter.Next()
if raw == nil {
break
}

d := raw.(*structs.Deployment)

// We are only looking for active deployments where the job no longer
// exists
if !d.Active() {
continue
}

// Find the job
job, err := state.JobByID(nil, d.Namespace, d.JobID)
if err != nil {
return fmt.Errorf("failed to lookup job %s from deployment %q: %v", d.JobID, d.ID, err)
}

// Job exists.
if job != nil {
continue
}

// Update the deployment to be terminal
failed := d.Copy()
failed.Status = structs.DeploymentStatusCancelled
failed.StatusDescription = structs.DeploymentStatusDescriptionStoppedJob
if err := state.UpsertDeployment(dindex, failed); err != nil {
return fmt.Errorf("failed to mark leaked deployment %q as failed: %v", failed.ID, err)
}
}

return nil
}

// reconcileQueuedAllocations re-calculates the queued allocations for every job that we
// created a Job Summary during the snap shot restore
func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
Expand Down
18 changes: 18 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2740,6 +2740,24 @@ func TestFSM_ReconcileSummaries(t *testing.T) {
}
}

func TestFSM_LeakedDeployments(t *testing.T) {
t.Parallel()
require := require.New(t)

// Add some state
fsm := testFSM(t)
state := fsm.State()
d := mock.Deployment()
require.NoError(state.UpsertDeployment(1000, d))

// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out, _ := state2.DeploymentByID(nil, d.ID)
require.NotNil(out)
require.Equal(structs.DeploymentStatusCancelled, out.Status)
}

func TestFSM_Autopilot(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
Expand Down

0 comments on commit 689d396

Please sign in to comment.