From aff7296f1e45aba78cd59ff3c7a37d2c7e882173 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 4 Jan 2022 15:25:58 -0500 Subject: [PATCH] csi: reap unused volume claims at leadership transitions When `volumewatcher.Watcher` starts on the leader, it starts a watch on every volume and triggers a reap of unused claims on any change to that volume. But if a reaping is in-flight during leadership transitions, it will fail and the event that triggered the reap will be dropped. Perform one reap of unused claims at the start of the watcher so that leadership transitions don't drop this event. --- .changelog/11776.txt | 3 ++ nomad/volumewatcher/volume_watcher.go | 7 ++++ nomad/volumewatcher/volumes_watcher_test.go | 41 +++++++++++++++++++-- 3 files changed, 47 insertions(+), 4 deletions(-) create mode 100644 .changelog/11776.txt diff --git a/.changelog/11776.txt b/.changelog/11776.txt new file mode 100644 index 00000000000..9a03744953f --- /dev/null +++ b/.changelog/11776.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: Fixed a bug where volume claim releases that were not fully processed before a leadership transition would be ignored +``` diff --git a/nomad/volumewatcher/volume_watcher.go b/nomad/volumewatcher/volume_watcher.go index 80304c57440..28fc94f3536 100644 --- a/nomad/volumewatcher/volume_watcher.go +++ b/nomad/volumewatcher/volume_watcher.go @@ -104,6 +104,13 @@ func (vw *volumeWatcher) isRunning() bool { // Each pass steps the volume's claims through the various states of reaping // until the volume has no more claims eligible to be reaped. func (vw *volumeWatcher) watch() { + // always denormalize the volume and call reap when we first start + // the watcher so that we ensure we don't drop events that + // happened during leadership transitions and didn't get completed + // by the prior leader + vol := vw.getVolume(vw.v) + vw.volumeReap(vol) + for { select { // TODO(tgross): currently server->client RPC have no cancellation diff --git a/nomad/volumewatcher/volumes_watcher_test.go b/nomad/volumewatcher/volumes_watcher_test.go index 33267ffeff8..5aeec73ffd9 100644 --- a/nomad/volumewatcher/volumes_watcher_test.go +++ b/nomad/volumewatcher/volumes_watcher_test.go @@ -52,8 +52,8 @@ func TestVolumeWatch_EnableDisable(t *testing.T) { require.Equal(0, len(watcher.watchers)) } -// TestVolumeWatch_Checkpoint tests the checkpointing of progress across -// leader leader step-up/step-down +// TestVolumeWatch_LeadershipTransition tests the correct behavior of +// claim reaping across leader step-up/step-down func TestVolumeWatch_Checkpoint(t *testing.T) { t.Parallel() require := require.New(t) @@ -84,18 +84,51 @@ func TestVolumeWatch_Checkpoint(t *testing.T) { return 1 == len(watcher.watchers) }, time.Second, 10*time.Millisecond) - // step-down (this is sync, but step-up is async) + vol, _ = srv.State().CSIVolumeByID(nil, vol.Namespace, vol.ID) + require.Len(vol.PastClaims, 0, "expected to have 0 PastClaims") + require.Equal(srv.countCSIUnpublish, 0, "expected no CSI.Unpublish RPC calls") + + // trying to test a dropped watch is racy, so to reliably simulate + // this condition, step-down the watcher first and then perform + // the writes to the volume before starting the new watcher. no + // watches for that change will fire on the new watcher + + // step-down (this is sync) watcher.SetEnabled(false, nil) require.Equal(0, len(watcher.watchers)) - // step-up again + // allocation is now invalid + index++ + err = srv.State().DeleteEval(index, []string{}, []string{alloc.ID}) + require.NoError(err) + + // emit a GC so that we have a volume change that's dropped + claim := &structs.CSIVolumeClaim{ + AllocationID: alloc.ID, + NodeID: node.ID, + Mode: structs.CSIVolumeClaimGC, + State: structs.CSIVolumeClaimStateUnpublishing, + } + index++ + err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim) + require.NoError(err) + + // create a new watcher and enable it to simulate the leadership + // transition + watcher = NewVolumesWatcher(testlog.HCLogger(t), srv, "") watcher.SetEnabled(true, srv.State()) + require.Eventually(func() bool { watcher.wlock.RLock() defer watcher.wlock.RUnlock() + return 1 == len(watcher.watchers) && !watcher.watchers[vol.ID+vol.Namespace].isRunning() }, time.Second, 10*time.Millisecond) + + vol, _ = srv.State().CSIVolumeByID(nil, vol.Namespace, vol.ID) + require.Len(vol.PastClaims, 1, "expected to have 1 PastClaim") + require.Equal(srv.countCSIUnpublish, 1, "expected CSI.Unpublish RPC to be called") } // TestVolumeWatch_StartStop tests the start and stop of the watcher when