From 689d3966ad1cf1af7ae37b55c2cb3374bf075fca Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 23 May 2018 16:44:21 -0700 Subject: [PATCH] Clean up leaked deployments on restoration This PR cancels deployments that are active but do not have a job associated with them. This is a broken invariant that causes issues in the deployment watcher since it will not track them. Thus they are objects that can't be operated on or cleaned up. Fixes https://github.com/hashicorp/nomad/issues/4286 --- nomad/fsm.go | 59 +++++++++++++++++++++++++++++++++++++++++++++++ nomad/fsm_test.go | 18 +++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/nomad/fsm.go b/nomad/fsm.go index faa4e14bf5b..d507ebf9b80 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1261,6 +1261,12 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { } } + // COMPAT Remove in 0.10 + // Clean up active deployments that do not have a job + if err := n.failLeakedDeployments(newState); err != nil { + return err + } + // External code might be calling State(), so we need to synchronize // here to make sure we swap in the new state store atomically. n.stateLock.Lock() @@ -1276,6 +1282,59 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return nil } +// failLeakedDeployments is used to fail deployments that do not have a job. +// This state is a broken invariant that should not occur since 0.8.X. +func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error { + // Scan for deployments that are referencing a job that no longer exists. + // This could happen if multiple deployments were created for a given job + // and thus the older deployment leaks and then the job is removed. + iter, err := state.Deployments(nil) + if err != nil { + return fmt.Errorf("failed to query deployments: %v", err) + } + + dindex, err := state.Index("deployment") + if err != nil { + return fmt.Errorf("couldn't fetch index of deployments table: %v", err) + } + + for { + raw := iter.Next() + if raw == nil { + break + } + + d := raw.(*structs.Deployment) + + // We are only looking for active deployments where the job no longer + // exists + if !d.Active() { + continue + } + + // Find the job + job, err := state.JobByID(nil, d.Namespace, d.JobID) + if err != nil { + return fmt.Errorf("failed to lookup job %s from deployment %q: %v", d.JobID, d.ID, err) + } + + // Job exists. + if job != nil { + continue + } + + // Update the deployment to be terminal + failed := d.Copy() + failed.Status = structs.DeploymentStatusCancelled + failed.StatusDescription = structs.DeploymentStatusDescriptionStoppedJob + if err := state.UpsertDeployment(dindex, failed); err != nil { + return fmt.Errorf("failed to mark leaked deployment %q as failed: %v", failed.ID, err) + } + } + + return nil +} + // reconcileQueuedAllocations re-calculates the queued allocations for every job that we // created a Job Summary during the snap shot restore func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 14f984b2b60..0f8156de0da 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2740,6 +2740,24 @@ func TestFSM_ReconcileSummaries(t *testing.T) { } } +func TestFSM_LeakedDeployments(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Add some state + fsm := testFSM(t) + state := fsm.State() + d := mock.Deployment() + require.NoError(state.UpsertDeployment(1000, d)) + + // Verify the contents + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + out, _ := state2.DeploymentByID(nil, d.ID) + require.NotNil(out) + require.Equal(structs.DeploymentStatusCancelled, out.Status) +} + func TestFSM_Autopilot(t *testing.T) { t.Parallel() fsm := testFSM(t)