Skip to content

Commit

Permalink
deployment watcher: fix goroutine leak when job is purged
Browse files Browse the repository at this point in the history
The deployment watcher on the leader makes blocking queries to detect when the
set of active deployments changes. It takes the resulting list of deployments
and adds or removes watchers based on whether the deployment is active. But when
a job is purged, the deployment will be deleted. This unblocks the query but
the query result only shows the remaining deployments.

When the query unblocks, ensure that all active watchers have a corresponding
deployment in state. If not, remove the watcher so that the goroutine stops.

Fixes: #19988
  • Loading branch information
tgross committed Apr 10, 2024
1 parent a7c56a6 commit 004fb71
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .changelog/20348.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
deployments: Fixed a goroutine leak when jobs are purged
```
26 changes: 22 additions & 4 deletions nomad/deploymentwatcher/deployments_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ func (w *Watcher) watchDeployments(ctx context.Context) {
// Update the latest index
dindex = idx

// Ensure we are tracking the things we should and not tracking what we
// shouldn't be
// Ensure we are tracking only active deployments
for _, d := range deployments {
if d.Active() {
if err := w.add(d); err != nil {
Expand All @@ -191,6 +190,21 @@ func (w *Watcher) watchDeployments(ctx context.Context) {
w.remove(d)
}
}

// Ensure we're not tracking deployments that have been deleted because
// the job was purged
for _, watcher := range w.watchers {
var found bool
for _, d := range deployments {
if watcher.deploymentID == d.ID {
found = true
break
}
}
if !found {
w.removeByID(watcher.deploymentID)
}
}
}
}

Expand Down Expand Up @@ -285,6 +299,10 @@ func (w *Watcher) addLocked(d *structs.Deployment) (*deploymentWatcher, error) {
// remove stops watching a deployment. This can be because the deployment is
// complete or being deleted.
func (w *Watcher) remove(d *structs.Deployment) {
w.removeByID(d.ID)
}

func (w *Watcher) removeByID(id string) {
w.l.Lock()
defer w.l.Unlock()

Expand All @@ -293,9 +311,9 @@ func (w *Watcher) remove(d *structs.Deployment) {
return
}

if watcher, ok := w.watchers[d.ID]; ok {
if watcher, ok := w.watchers[id]; ok {
watcher.StopWatch()
delete(w.watchers, d.ID)
delete(w.watchers, id)
}
}

Expand Down
51 changes: 51 additions & 0 deletions nomad/deploymentwatcher/deployments_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/assert"
mocker "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -2081,3 +2082,53 @@ func watchersCount(w *Watcher) int {

return len(w.watchers)
}

// TestWatcher_PurgeDeployment tests that we don't leak watchers if a job is purged
func TestWatcher_PurgeDeployment(t *testing.T) {
ci.Parallel(t)
w, m := defaultTestDeploymentWatcher(t)

// clear UpdateDeploymentStatus default expectation
m.Mock.ExpectedCalls = nil

// Create a job and a deployment
j := mock.Job()
d := mock.Deployment()
d.JobID = j.ID
must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j))
must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d))

// require that we get a call to UpsertDeploymentStatusUpdate
matchConfig := &matchDeploymentStatusUpdateConfig{
DeploymentID: d.ID,
Status: structs.DeploymentStatusPaused,
StatusDescription: structs.DeploymentStatusDescriptionPaused,
}
matcher := matchDeploymentStatusUpdateRequest(matchConfig)
m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil)

w.SetEnabled(true, m.state)
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
if watchersCount(w) != 1 {
return fmt.Errorf("expected 1 deployment")
}
return nil
}),
wait.Attempts(100),
wait.Gap(10*time.Millisecond),
))

must.NoError(t, m.state.DeleteJob(m.nextIndex(), j.Namespace, j.ID))

must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
if watchersCount(w) != 0 {
return fmt.Errorf("expected deployment watcher to be stopped")
}
return nil
}),
wait.Attempts(500),
wait.Gap(10*time.Millisecond),
))
}

0 comments on commit 004fb71

Please sign in to comment.