diff --git a/.changelog/20348.txt b/.changelog/20348.txt new file mode 100644 index 00000000000..f8ac880234c --- /dev/null +++ b/.changelog/20348.txt @@ -0,0 +1,3 @@ +```release-note:bug +deployments: Fixed a goroutine leak when jobs are purged +``` diff --git a/nomad/deploymentwatcher/deployments_watcher.go b/nomad/deploymentwatcher/deployments_watcher.go index b32b9a30a9c..6c909111c91 100644 --- a/nomad/deploymentwatcher/deployments_watcher.go +++ b/nomad/deploymentwatcher/deployments_watcher.go @@ -180,8 +180,7 @@ func (w *Watcher) watchDeployments(ctx context.Context) { // Update the latest index dindex = idx - // Ensure we are tracking the things we should and not tracking what we - // shouldn't be + // Ensure we are tracking only active deployments for _, d := range deployments { if d.Active() { if err := w.add(d); err != nil { @@ -191,6 +190,9 @@ func (w *Watcher) watchDeployments(ctx context.Context) { w.remove(d) } } + + // Ensure we've removed deployments for purged jobs + w.removeDeletedDeployments(deployments) } } @@ -236,6 +238,28 @@ func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, store *state.StateStore) (in return deploys, index, nil } +// removeDeletedDeployments removes any watchers that aren't in the list of +// deployments we got from state +func (w *Watcher) removeDeletedDeployments(deployments []*structs.Deployment) { + w.l.Lock() + defer w.l.Unlock() + + // note we can't optimize this by checking the lengths first because some + // deployments might not be active + for _, watcher := range w.watchers { + var found bool + for _, d := range deployments { + if watcher.deploymentID == d.ID { + found = true + break + } + } + if !found { + w.removeByIDLocked(watcher.deploymentID) + } + } +} + // add adds a deployment to the watch list func (w *Watcher) add(d *structs.Deployment) error { w.l.Lock() @@ -287,15 +311,18 @@ func (w *Watcher) addLocked(d *structs.Deployment) (*deploymentWatcher, error) { func (w *Watcher) remove(d *structs.Deployment) { w.l.Lock() defer w.l.Unlock() + w.removeByIDLocked(d.ID) +} +func (w *Watcher) removeByIDLocked(id string) { // Not enabled so no-op if !w.enabled { return } - if watcher, ok := w.watchers[d.ID]; ok { + if watcher, ok := w.watchers[id]; ok { watcher.StopWatch() - delete(w.watchers, d.ID) + delete(w.watchers, id) } } diff --git a/nomad/deploymentwatcher/deployments_watcher_test.go b/nomad/deploymentwatcher/deployments_watcher_test.go index cde0334bb08..a6b880441f7 100644 --- a/nomad/deploymentwatcher/deployments_watcher_test.go +++ b/nomad/deploymentwatcher/deployments_watcher_test.go @@ -17,6 +17,7 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" "github.com/stretchr/testify/assert" mocker "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -2081,3 +2082,53 @@ func watchersCount(w *Watcher) int { return len(w.watchers) } + +// TestWatcher_PurgeDeployment tests that we don't leak watchers if a job is purged +func TestWatcher_PurgeDeployment(t *testing.T) { + ci.Parallel(t) + w, m := defaultTestDeploymentWatcher(t) + + // clear UpdateDeploymentStatus default expectation + m.Mock.ExpectedCalls = nil + + // Create a job and a deployment + j := mock.Job() + d := mock.Deployment() + d.JobID = j.ID + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + + // require that we get a call to UpsertDeploymentStatusUpdate + matchConfig := &matchDeploymentStatusUpdateConfig{ + DeploymentID: d.ID, + Status: structs.DeploymentStatusPaused, + StatusDescription: structs.DeploymentStatusDescriptionPaused, + } + matcher := matchDeploymentStatusUpdateRequest(matchConfig) + m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil) + + w.SetEnabled(true, m.state) + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + if watchersCount(w) != 1 { + return fmt.Errorf("expected 1 deployment") + } + return nil + }), + wait.Attempts(100), + wait.Gap(10*time.Millisecond), + )) + + must.NoError(t, m.state.DeleteJob(m.nextIndex(), j.Namespace, j.ID)) + + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + if watchersCount(w) != 0 { + return fmt.Errorf("expected deployment watcher to be stopped") + } + return nil + }), + wait.Attempts(500), + wait.Gap(10*time.Millisecond), + )) +}