diff --git a/.changelog/11776.txt b/.changelog/11776.txt new file mode 100644 index 00000000000..9a03744953f --- /dev/null +++ b/.changelog/11776.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: Fixed a bug where volume claim releases that were not fully processed before a leadership transition would be ignored +``` diff --git a/nomad/volumewatcher/volume_watcher.go b/nomad/volumewatcher/volume_watcher.go index 80304c57440..28fc94f3536 100644 --- a/nomad/volumewatcher/volume_watcher.go +++ b/nomad/volumewatcher/volume_watcher.go @@ -104,6 +104,13 @@ func (vw *volumeWatcher) isRunning() bool { // Each pass steps the volume's claims through the various states of reaping // until the volume has no more claims eligible to be reaped. func (vw *volumeWatcher) watch() { + // always denormalize the volume and call reap when we first start + // the watcher so that we ensure we don't drop events that + // happened during leadership transitions and didn't get completed + // by the prior leader + vol := vw.getVolume(vw.v) + vw.volumeReap(vol) + for { select { // TODO(tgross): currently server->client RPC have no cancellation diff --git a/nomad/volumewatcher/volumes_watcher_test.go b/nomad/volumewatcher/volumes_watcher_test.go index 33267ffeff8..185a7225e68 100644 --- a/nomad/volumewatcher/volumes_watcher_test.go +++ b/nomad/volumewatcher/volumes_watcher_test.go @@ -52,9 +52,9 @@ func TestVolumeWatch_EnableDisable(t *testing.T) { require.Equal(0, len(watcher.watchers)) } -// TestVolumeWatch_Checkpoint tests the checkpointing of progress across -// leader leader step-up/step-down -func TestVolumeWatch_Checkpoint(t *testing.T) { +// TestVolumeWatch_LeadershipTransition tests the correct behavior of +// claim reaping across leader step-up/step-down +func TestVolumeWatch_LeadershipTransition(t *testing.T) { t.Parallel() require := require.New(t) @@ -84,18 +84,51 @@ func TestVolumeWatch_Checkpoint(t *testing.T) { return 1 == len(watcher.watchers) }, time.Second, 10*time.Millisecond) - // step-down (this is sync, but step-up is async) + vol, _ = srv.State().CSIVolumeByID(nil, vol.Namespace, vol.ID) + require.Len(vol.PastClaims, 0, "expected to have 0 PastClaims") + require.Equal(srv.countCSIUnpublish, 0, "expected no CSI.Unpublish RPC calls") + + // trying to test a dropped watch is racy, so to reliably simulate + // this condition, step-down the watcher first and then perform + // the writes to the volume before starting the new watcher. no + // watches for that change will fire on the new watcher + + // step-down (this is sync) watcher.SetEnabled(false, nil) require.Equal(0, len(watcher.watchers)) - // step-up again + // allocation is now invalid + index++ + err = srv.State().DeleteEval(index, []string{}, []string{alloc.ID}) + require.NoError(err) + + // emit a GC so that we have a volume change that's dropped + claim := &structs.CSIVolumeClaim{ + AllocationID: alloc.ID, + NodeID: node.ID, + Mode: structs.CSIVolumeClaimGC, + State: structs.CSIVolumeClaimStateUnpublishing, + } + index++ + err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim) + require.NoError(err) + + // create a new watcher and enable it to simulate the leadership + // transition + watcher = NewVolumesWatcher(testlog.HCLogger(t), srv, "") watcher.SetEnabled(true, srv.State()) + require.Eventually(func() bool { watcher.wlock.RLock() defer watcher.wlock.RUnlock() + return 1 == len(watcher.watchers) && !watcher.watchers[vol.ID+vol.Namespace].isRunning() }, time.Second, 10*time.Millisecond) + + vol, _ = srv.State().CSIVolumeByID(nil, vol.Namespace, vol.ID) + require.Len(vol.PastClaims, 1, "expected to have 1 PastClaim") + require.Equal(srv.countCSIUnpublish, 1, "expected CSI.Unpublish RPC to be called") } // TestVolumeWatch_StartStop tests the start and stop of the watcher when