Skip to content

Commit

Permalink
volumewatcher: stop watcher goroutines when there's no work (#7909)
Browse files Browse the repository at this point in the history
The watcher goroutines will be automatically started if a volume has
updates, but when idle we shouldn't keep a goroutine running and
taking up memory.
  • Loading branch information
tgross authored May 11, 2020
1 parent 3ee7379 commit 3d6c088
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 24 deletions.
3 changes: 3 additions & 0 deletions nomad/volumewatcher/volume_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ func (vw *volumeWatcher) watch() {
}
vw.volumeReap(vol)
}
default:
vw.Stop() // no pending work
return
}
}
}
Expand Down
52 changes: 28 additions & 24 deletions nomad/volumewatcher/volumes_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,9 @@ func TestVolumeWatch_Checkpoint(t *testing.T) {
// step-up again
watcher.SetEnabled(true, srv.State())
require.Eventually(func() bool {
return 1 == len(watcher.watchers)
return 1 == len(watcher.watchers) &&
!watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, time.Second, 10*time.Millisecond)

require.True(watcher.watchers[vol.ID+vol.Namespace].isRunning())
}

// TestVolumeWatch_StartStop tests the start and stop of the watcher when
Expand Down Expand Up @@ -123,33 +122,33 @@ func TestVolumeWatch_StartStop(t *testing.T) {

plugin := mock.CSIPlugin()
node := testNode(nil, plugin, srv.State())
alloc := mock.Alloc()
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc1 := mock.Alloc()
alloc1.ClientStatus = structs.AllocClientStatusRunning
alloc2 := mock.Alloc()
alloc2.Job = alloc.Job
alloc2.Job = alloc1.Job
alloc2.ClientStatus = structs.AllocClientStatusRunning
index++
err := srv.State().UpsertJob(index, alloc.Job)
err := srv.State().UpsertJob(index, alloc1.Job)
require.NoError(err)
index++
err = srv.State().UpsertAllocs(index, []*structs.Allocation{alloc, alloc2})
err = srv.State().UpsertAllocs(index, []*structs.Allocation{alloc1, alloc2})
require.NoError(err)

// register a volume
vol := testVolume(nil, plugin, alloc, node.ID)
vol := testVolume(nil, plugin, alloc1, node.ID)
index++
err = srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol})
require.NoError(err)

// assert we get a running watcher
// assert we get a watcher; there are no claims so it should immediately stop
require.Eventually(func() bool {
return 1 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)
require.True(watcher.watchers[vol.ID+vol.Namespace].isRunning())
return 1 == len(watcher.watchers) &&
!watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, time.Second*2, 10*time.Millisecond)

// claim the volume for both allocs
claim := &structs.CSIVolumeClaim{
AllocationID: alloc.ID,
AllocationID: alloc1.ID,
NodeID: node.ID,
Mode: structs.CSIVolumeClaimRead,
}
Expand All @@ -163,33 +162,38 @@ func TestVolumeWatch_StartStop(t *testing.T) {

// reap the volume and assert nothing has happened
claim = &structs.CSIVolumeClaim{
AllocationID: alloc.ID,
AllocationID: alloc1.ID,
NodeID: node.ID,
Mode: structs.CSIVolumeClaimRelease,
}
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(err)
require.True(watcher.watchers[vol.ID+vol.Namespace].isRunning())

ws := memdb.NewWatchSet()
vol, _ = srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
require.Equal(2, len(vol.ReadAllocs))

// alloc becomes terminal
alloc.ClientStatus = structs.AllocClientStatusComplete
alloc1.ClientStatus = structs.AllocClientStatusComplete
index++
err = srv.State().UpsertAllocs(index, []*structs.Allocation{alloc})
err = srv.State().UpsertAllocs(index, []*structs.Allocation{alloc1})
require.NoError(err)
index++
claim.State = structs.CSIVolumeClaimStateReadyToFree
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(err)

// 1 claim has been released but watcher is still running
// 1 claim has been released and watcher stops
require.Eventually(func() bool {
ws := memdb.NewWatchSet()
vol, _ := srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
return len(vol.ReadAllocs) == 1 && len(vol.PastClaims) == 0
}, time.Second*2, 10*time.Millisecond)

require.True(watcher.watchers[vol.ID+vol.Namespace].isRunning())
require.Eventually(func() bool {
return !watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, time.Second*5, 10*time.Millisecond)

// the watcher will have incremented the index so we need to make sure
// our inserts will trigger new events
Expand All @@ -209,16 +213,16 @@ func TestVolumeWatch_StartStop(t *testing.T) {
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(err)

// all claims have been released and watcher is stopped
// all claims have been released and watcher has stopped again
require.Eventually(func() bool {
ws := memdb.NewWatchSet()
vol, _ := srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
return len(vol.ReadAllocs) == 1 && len(vol.PastClaims) == 0
return len(vol.ReadAllocs) == 0 && len(vol.PastClaims) == 0
}, time.Second*2, 10*time.Millisecond)

require.Eventually(func() bool {
return !watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, time.Second*1, 10*time.Millisecond)
}, time.Second*5, 10*time.Millisecond)

// the watcher will have incremented the index so we need to make sure
// our inserts will trigger new events
Expand All @@ -242,7 +246,7 @@ func TestVolumeWatch_StartStop(t *testing.T) {
// a stopped watcher should restore itself on notification
require.Eventually(func() bool {
return watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, time.Second*1, 10*time.Millisecond)
}, time.Second*5, 10*time.Millisecond)
}

// TestVolumeWatch_RegisterDeregister tests the start and stop of
Expand Down

0 comments on commit 3d6c088

Please sign in to comment.