Skip to content

Commit

Permalink
Backport of deployment watcher: fix goroutine leak when job is purged (
Browse files Browse the repository at this point in the history
…#20348) (#20359)

The deployment watcher on the leader makes blocking queries to detect when the
set of active deployments changes. It takes the resulting list of deployments
and adds or removes watchers based on whether the deployment is active. But when
a job is purged, the deployment will be deleted. This unblocks the query but
the query result only shows the remaining deployments.

When the query unblocks, ensure that all active watchers have a corresponding
deployment in state. If not, remove the watcher so that the goroutine stops.

Fixes: #19988

Co-authored-by: Tim Gross <[email protected]>
  • Loading branch information
hc-github-team-nomad-core and tgross authored Apr 11, 2024
1 parent ba96863 commit 5b1cfb9
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .changelog/20348.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
deployments: Fixed a goroutine leak when jobs are purged
```
35 changes: 31 additions & 4 deletions nomad/deploymentwatcher/deployments_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ func (w *Watcher) watchDeployments(ctx context.Context) {
// Update the latest index
dindex = idx

// Ensure we are tracking the things we should and not tracking what we
// shouldn't be
// Ensure we are tracking only active deployments
for _, d := range deployments {
if d.Active() {
if err := w.add(d); err != nil {
Expand All @@ -191,6 +190,9 @@ func (w *Watcher) watchDeployments(ctx context.Context) {
w.remove(d)
}
}

// Ensure we've removed deployments for purged jobs
w.removeDeletedDeployments(deployments)
}
}

Expand Down Expand Up @@ -236,6 +238,28 @@ func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, store *state.StateStore) (in
return deploys, index, nil
}

// removeDeletedDeployments removes any watchers that aren't in the list of
// deployments we got from state
func (w *Watcher) removeDeletedDeployments(deployments []*structs.Deployment) {
w.l.Lock()
defer w.l.Unlock()

// note we can't optimize this by checking the lengths first because some
// deployments might not be active
for _, watcher := range w.watchers {
var found bool
for _, d := range deployments {
if watcher.deploymentID == d.ID {
found = true
break
}
}
if !found {
w.removeByIDLocked(watcher.deploymentID)
}
}
}

// add adds a deployment to the watch list
func (w *Watcher) add(d *structs.Deployment) error {
w.l.Lock()
Expand Down Expand Up @@ -287,15 +311,18 @@ func (w *Watcher) addLocked(d *structs.Deployment) (*deploymentWatcher, error) {
func (w *Watcher) remove(d *structs.Deployment) {
w.l.Lock()
defer w.l.Unlock()
w.removeByIDLocked(d.ID)
}

func (w *Watcher) removeByIDLocked(id string) {
// Not enabled so no-op
if !w.enabled {
return
}

if watcher, ok := w.watchers[d.ID]; ok {
if watcher, ok := w.watchers[id]; ok {
watcher.StopWatch()
delete(w.watchers, d.ID)
delete(w.watchers, id)
}
}

Expand Down
52 changes: 52 additions & 0 deletions nomad/deploymentwatcher/deployments_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/assert"
mocker "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1996,3 +1998,53 @@ func watchersCount(w *Watcher) int {

return len(w.watchers)
}

// TestWatcher_PurgeDeployment tests that we don't leak watchers if a job is purged
func TestWatcher_PurgeDeployment(t *testing.T) {
ci.Parallel(t)
w, m := defaultTestDeploymentWatcher(t)

// clear UpdateDeploymentStatus default expectation
m.Mock.ExpectedCalls = nil

// Create a job and a deployment
j := mock.Job()
d := mock.Deployment()
d.JobID = j.ID
must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j))

Check failure on line 2014 in nomad/deploymentwatcher/deployments_watcher_test.go

View workflow job for this annotation

GitHub Actions / tests-groups (quick)

too many arguments in call to m.state.UpsertJob

Check failure on line 2014 in nomad/deploymentwatcher/deployments_watcher_test.go

View workflow job for this annotation

GitHub Actions / tests-groups (quick)

too many arguments in call to m.state.UpsertJob
must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d))

// require that we get a call to UpsertDeploymentStatusUpdate
matchConfig := &matchDeploymentStatusUpdateConfig{
DeploymentID: d.ID,
Status: structs.DeploymentStatusPaused,
StatusDescription: structs.DeploymentStatusDescriptionPaused,
}
matcher := matchDeploymentStatusUpdateRequest(matchConfig)
m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil)

w.SetEnabled(true, m.state)
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
if watchersCount(w) != 1 {
return fmt.Errorf("expected 1 deployment")
}
return nil
}),
wait.Attempts(100),
wait.Gap(10*time.Millisecond),
))

must.NoError(t, m.state.DeleteJob(m.nextIndex(), j.Namespace, j.ID))

must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
if watchersCount(w) != 0 {
return fmt.Errorf("expected deployment watcher to be stopped")
}
return nil
}),
wait.Attempts(500),
wait.Gap(10*time.Millisecond),
))
}

0 comments on commit 5b1cfb9

Please sign in to comment.