diff --git a/.changelog/14675.txt b/.changelog/14675.txt new file mode 100644 index 00000000000..2efa8ce2bb1 --- /dev/null +++ b/.changelog/14675.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: Fixed a bug where a volume that was successfully unmounted by the client but then failed controller unpublishing would not be marked free until garbage collection ran. +``` diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 1ec20f538d8..8ca4e7a2ef6 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -2459,7 +2459,7 @@ func TestCoreScheduler_CSIBadState_ClaimGC(t *testing.T) { } } return true - }, time.Second*1, 10*time.Millisecond, "invalid claims should be marked for GC") + }, time.Second*5, 10*time.Millisecond, "invalid claims should be marked for GC") } diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index cdf3d4cd395..d3cd71cdab6 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -574,6 +574,16 @@ func (v *CSIVolume) Unpublish(args *structs.CSIVolumeUnpublishRequest, reply *st claim := args.Claim + // we need to checkpoint when we first get the claim to ensure we've set the + // initial "past claim" state, otherwise a client that unpublishes (skipping + // the node unpublish b/c it's done that work) fail to get written if the + // controller unpublish fails. + vol = vol.Copy() + err = v.checkpointClaim(vol, claim) + if err != nil { + return err + } + // previous checkpoints may have set the past claim state already. // in practice we should never see CSIVolumeClaimStateControllerDetached // but having an option for the state makes it easy to add a checkpoint @@ -619,6 +629,13 @@ RELEASE_CLAIM: // problems. This function should only be called on a copy of the volume. func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error { v.logger.Trace("node unpublish", "vol", vol.ID) + + // We need a new snapshot after each checkpoint + snap, err := v.srv.fsm.State().Snapshot() + if err != nil { + return err + } + if claim.AllocationID != "" { err := v.nodeUnpublishVolumeImpl(vol, claim) if err != nil { @@ -631,8 +648,7 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C // The RPC sent from the 'nomad node detach' command or GC won't have an // allocation ID set so we try to unpublish every terminal or invalid // alloc on the node, all of which will be in PastClaims after denormalizing - state := v.srv.fsm.State() - vol, err := state.CSIVolumeDenormalize(memdb.NewWatchSet(), vol) + vol, err = snap.CSIVolumeDenormalize(memdb.NewWatchSet(), vol) if err != nil { return err } @@ -702,10 +718,15 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str return nil } - state := v.srv.fsm.State() + // We need a new snapshot after each checkpoint + snap, err := v.srv.fsm.State().Snapshot() + if err != nil { + return err + } + ws := memdb.NewWatchSet() - plugin, err := state.CSIPluginByID(ws, vol.PluginID) + plugin, err := snap.CSIPluginByID(ws, vol.PluginID) if err != nil { return fmt.Errorf("could not query plugin: %v", err) } else if plugin == nil { @@ -717,7 +738,7 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str return nil } - vol, err = state.CSIVolumeDenormalize(ws, vol) + vol, err = snap.CSIVolumeDenormalize(ws, vol) if err != nil { return err } diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index b8c54d3855e..0a91ec6a04a 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -568,8 +568,8 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { // setup: create an alloc that will claim our volume alloc := mock.BatchAlloc() - alloc.NodeID = node.ID - alloc.ClientStatus = structs.AllocClientStatusFailed + alloc.NodeID = tc.nodeID + alloc.ClientStatus = structs.AllocClientStatusRunning otherAlloc := mock.BatchAlloc() otherAlloc.NodeID = tc.otherNodeID @@ -579,7 +579,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc, otherAlloc})) - // setup: claim the volume for our alloc + // setup: claim the volume for our to-be-failed alloc claim := &structs.CSIVolumeClaim{ AllocationID: alloc.ID, NodeID: node.ID, @@ -617,10 +617,19 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { }, } + alloc = alloc.Copy() + alloc.ClientStatus = structs.AllocClientStatusFailed + index++ + require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, index, + []*structs.Allocation{alloc})) + err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Unpublish", req, &structs.CSIVolumeUnpublishResponse{}) - vol, volErr := state.CSIVolumeByID(nil, ns, volID) + snap, snapErr := state.Snapshot() + require.NoError(t, snapErr) + + vol, volErr := snap.CSIVolumeByID(nil, ns, volID) require.NoError(t, volErr) require.NotNil(t, vol)