Skip to content

Commit

Permalink
csi: use denormalized past claims for node unpublish RPC
Browse files Browse the repository at this point in the history
Remove logic in the node unpublish RPC that now lives in the state
store's `CSIVolumeDenormalize` method.

(4/4 refactoring commits)
  • Loading branch information
tgross committed Jan 27, 2022
1 parent 7c34948 commit f223bf7
Showing 1 changed file with 12 additions and 26 deletions.
38 changes: 12 additions & 26 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,39 +615,25 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C
return v.checkpointClaim(vol, claim)
}

// The RPC sent from the 'nomad node detach' command won't have an
// The RPC sent from the 'nomad node detach' command or GC won't have an
// allocation ID set so we try to unpublish every terminal or invalid
// alloc on the node
allocIDs := []string{}
// alloc on the node, all of which will be in PastClaims after denormalizing
state := v.srv.fsm.State()
vol, err := state.CSIVolumeDenormalize(memdb.NewWatchSet(), vol)
if err != nil {
return err
}
for allocID, alloc := range vol.ReadAllocs {
if alloc == nil {
rclaim, ok := vol.ReadClaims[allocID]
if ok && rclaim.NodeID == claim.NodeID {
allocIDs = append(allocIDs, allocID)
}
} else if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() {
allocIDs = append(allocIDs, allocID)
}
}
for allocID, alloc := range vol.WriteAllocs {
if alloc == nil {
wclaim, ok := vol.WriteClaims[allocID]
if ok && wclaim.NodeID == claim.NodeID {
allocIDs = append(allocIDs, allocID)
}
} else if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() {
allocIDs = append(allocIDs, allocID)

claimsToUnpublish := []*structs.CSIVolumeClaim{}
for _, pastClaim := range vol.PastClaims {
if claim.NodeID == pastClaim.NodeID {
claimsToUnpublish = append(claimsToUnpublish, pastClaim)
}
}

var merr multierror.Error
for _, allocID := range allocIDs {
claim.AllocationID = allocID
err := v.nodeUnpublishVolumeImpl(vol, claim)
for _, pastClaim := range claimsToUnpublish {
err := v.nodeUnpublishVolumeImpl(vol, pastClaim)
if err != nil {
merr.Errors = append(merr.Errors, err)
}
Expand All @@ -668,8 +654,8 @@ func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *struc
ExternalID: vol.RemoteID(),
AllocID: claim.AllocationID,
NodeID: claim.NodeID,
AttachmentMode: vol.AttachmentMode,
AccessMode: vol.AccessMode,
AttachmentMode: claim.AttachmentMode,
AccessMode: claim.AccessMode,
ReadOnly: claim.Mode == structs.CSIVolumeClaimRead,
}
err := v.srv.RPC("ClientCSI.NodeDetachVolume",
Expand Down

0 comments on commit f223bf7

Please sign in to comment.