Skip to content

Commit

Permalink
csi: move volumewatcher's terminal collection into denormalization
Browse files Browse the repository at this point in the history
Remove logic in the volumewatcher that now lives in the state
store's `CSIVolumeDenormalize` method.

(3/3 refactoring commits)
  • Loading branch information
tgross committed Jan 25, 2022
1 parent e90ce03 commit d4670ad
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 13 deletions.
15 changes: 3 additions & 12 deletions nomad/volumewatcher/volume_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,27 +177,18 @@ func (vw *volumeWatcher) isUnclaimed(vol *structs.CSIVolume) bool {
return len(vol.ReadClaims) == 0 && len(vol.WriteClaims) == 0 && len(vol.PastClaims) == 0
}

// volumeReapImpl unpublished all the volume's PastClaims. PastClaims
// will be populated from nil or terminal allocs when we call
// CSIVolumeDenormalize(), so this assumes we've done so in the caller
func (vw *volumeWatcher) volumeReapImpl(vol *structs.CSIVolume) error {

// PastClaims written by a volume GC core job will have no allocation,
// so we need to find out which allocs are eligible for cleanup.
for _, claim := range vol.PastClaims {
if claim.AllocationID == "" {
vol = vw.collectPastClaims(vol)
break // only need to collect once
}
}

var result *multierror.Error
for _, claim := range vol.PastClaims {
err := vw.unpublish(vol, claim)
if err != nil {
result = multierror.Append(result, err)
}
}

return result.ErrorOrNil()

}

func (vw *volumeWatcher) collectPastClaims(vol *structs.CSIVolume) *structs.CSIVolume {
Expand Down
4 changes: 4 additions & 0 deletions nomad/volumewatcher/volume_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestVolumeWatch_Reap(t *testing.T) {
logger: testlog.HCLogger(t),
}

vol, _ = srv.State().CSIVolumeDenormalize(nil, vol.Copy())
err := w.volumeReapImpl(vol)
require.NoError(err)

Expand All @@ -48,6 +49,7 @@ func TestVolumeWatch_Reap(t *testing.T) {
State: structs.CSIVolumeClaimStateNodeDetached,
},
}
vol, _ = srv.State().CSIVolumeDenormalize(nil, vol.Copy())
err = w.volumeReapImpl(vol)
require.NoError(err)
require.Len(vol.PastClaims, 1)
Expand All @@ -59,6 +61,7 @@ func TestVolumeWatch_Reap(t *testing.T) {
Mode: structs.CSIVolumeClaimGC,
},
}
vol, _ = srv.State().CSIVolumeDenormalize(nil, vol.Copy())
err = w.volumeReapImpl(vol)
require.NoError(err)
require.Len(vol.PastClaims, 2) // alloc claim + GC claim
Expand All @@ -71,6 +74,7 @@ func TestVolumeWatch_Reap(t *testing.T) {
Mode: structs.CSIVolumeClaimRead,
},
}
vol, _ = srv.State().CSIVolumeDenormalize(nil, vol.Copy())
err = w.volumeReapImpl(vol)
require.NoError(err)
require.Len(vol.PastClaims, 2) // alloc claim + GC claim
Expand Down
2 changes: 1 addition & 1 deletion nomad/volumewatcher/volumes_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
plugin := mock.CSIPlugin()
node := testNode(plugin, srv.State())
alloc := mock.Alloc()
alloc.ClientStatus = structs.AllocClientStatusComplete
alloc.ClientStatus = structs.AllocClientStatusRunning
vol := testVolume(plugin, alloc, node.ID)

index++
Expand Down

0 comments on commit d4670ad

Please sign in to comment.