Skip to content

Commit

Permalink
CSI: move terminal alloc handling into denormalization (#11931)
Browse files Browse the repository at this point in the history
* The volume claim GC method and volumewatcher both have logic
collecting terminal allocations that duplicates most of the logic
that's now in the state store's `CSIVolumeDenormalize` method. Copy
this logic into the state store so that all code paths have the same
view of the past claims.
* Remove logic in the volume claim GC that now lives in the state
store's `CSIVolumeDenormalize` method.
* Remove logic in the volumewatcher that now lives in the state
store's `CSIVolumeDenormalize` method.
* Remove logic in the node unpublish RPC that now lives in the state
store's `CSIVolumeDenormalize` method.
  • Loading branch information
tgross committed Jan 28, 2022
1 parent 26b5008 commit 2c6de3e
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 70 deletions.
33 changes: 3 additions & 30 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,6 @@ func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
"index", oldThreshold,
"csi_volume_claim_gc_threshold", c.srv.config.CSIVolumeClaimGCThreshold)

NEXT_VOLUME:
for i := iter.Next(); i != nil; i = iter.Next() {
vol := i.(*structs.CSIVolume)

Expand All @@ -783,38 +782,12 @@ NEXT_VOLUME:
continue
}

// TODO(tgross): consider moving the TerminalStatus check into
// the denormalize volume logic so that we can just check the
// volume for past claims

// we only call the claim release RPC if the volume has claims
// that no longer have valid allocations. otherwise we'd send
// out a lot of do-nothing RPCs.
for id := range vol.ReadClaims {
alloc, err := c.snap.AllocByID(ws, id)
if err != nil {
return err
}
if alloc == nil || alloc.TerminalStatus() {
err = gcClaims(vol.Namespace, vol.ID)
if err != nil {
return err
}
goto NEXT_VOLUME
}
}
for id := range vol.WriteClaims {
alloc, err := c.snap.AllocByID(ws, id)
if err != nil {
return err
}
if alloc == nil || alloc.TerminalStatus() {
err = gcClaims(vol.Namespace, vol.ID)
if err != nil {
return err
}
goto NEXT_VOLUME
}
vol, err := c.snap.CSIVolumeDenormalize(ws, vol)
if err != nil {
return err
}
if len(vol.PastClaims) > 0 {
err = gcClaims(vol.Namespace, vol.ID)
Expand Down
38 changes: 12 additions & 26 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,39 +613,25 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C
return v.checkpointClaim(vol, claim)
}

// The RPC sent from the 'nomad node detach' command won't have an
// The RPC sent from the 'nomad node detach' command or GC won't have an
// allocation ID set so we try to unpublish every terminal or invalid
// alloc on the node
allocIDs := []string{}
// alloc on the node, all of which will be in PastClaims after denormalizing
state := v.srv.fsm.State()
vol, err := state.CSIVolumeDenormalize(memdb.NewWatchSet(), vol)
if err != nil {
return err
}
for allocID, alloc := range vol.ReadAllocs {
if alloc == nil {
rclaim, ok := vol.ReadClaims[allocID]
if ok && rclaim.NodeID == claim.NodeID {
allocIDs = append(allocIDs, allocID)
}
} else if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() {
allocIDs = append(allocIDs, allocID)
}
}
for allocID, alloc := range vol.WriteAllocs {
if alloc == nil {
wclaim, ok := vol.WriteClaims[allocID]
if ok && wclaim.NodeID == claim.NodeID {
allocIDs = append(allocIDs, allocID)
}
} else if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() {
allocIDs = append(allocIDs, allocID)

claimsToUnpublish := []*structs.CSIVolumeClaim{}
for _, pastClaim := range vol.PastClaims {
if claim.NodeID == pastClaim.NodeID {
claimsToUnpublish = append(claimsToUnpublish, pastClaim)
}
}

var merr multierror.Error
for _, allocID := range allocIDs {
claim.AllocationID = allocID
err := v.nodeUnpublishVolumeImpl(vol, claim)
for _, pastClaim := range claimsToUnpublish {
err := v.nodeUnpublishVolumeImpl(vol, pastClaim)
if err != nil {
merr.Errors = append(merr.Errors, err)
}
Expand All @@ -666,8 +652,8 @@ func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *struc
ExternalID: vol.RemoteID(),
AllocID: claim.AllocationID,
NodeID: claim.NodeID,
AttachmentMode: vol.AttachmentMode,
AccessMode: vol.AccessMode,
AttachmentMode: claim.AttachmentMode,
AccessMode: claim.AccessMode,
ReadOnly: claim.Mode == structs.CSIVolumeClaimRead,
}
err := v.srv.RPC("ClientCSI.NodeDetachVolume",
Expand Down
2 changes: 1 addition & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2531,7 +2531,7 @@ func (s *StateStore) csiVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *st
}

currentAllocs[id] = a
if a == nil && pastClaim == nil {
if (a == nil || a.TerminalStatus()) && pastClaim == nil {
// the alloc is garbage collected but nothing has written a PastClaim,
// so create one now
pastClaim = &structs.CSIVolumeClaim{
Expand Down
15 changes: 3 additions & 12 deletions nomad/volumewatcher/volume_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,27 +177,18 @@ func (vw *volumeWatcher) isUnclaimed(vol *structs.CSIVolume) bool {
return len(vol.ReadClaims) == 0 && len(vol.WriteClaims) == 0 && len(vol.PastClaims) == 0
}

// volumeReapImpl unpublished all the volume's PastClaims. PastClaims
// will be populated from nil or terminal allocs when we call
// CSIVolumeDenormalize(), so this assumes we've done so in the caller
func (vw *volumeWatcher) volumeReapImpl(vol *structs.CSIVolume) error {

// PastClaims written by a volume GC core job will have no allocation,
// so we need to find out which allocs are eligible for cleanup.
for _, claim := range vol.PastClaims {
if claim.AllocationID == "" {
vol = vw.collectPastClaims(vol)
break // only need to collect once
}
}

var result *multierror.Error
for _, claim := range vol.PastClaims {
err := vw.unpublish(vol, claim)
if err != nil {
result = multierror.Append(result, err)
}
}

return result.ErrorOrNil()

}

func (vw *volumeWatcher) collectPastClaims(vol *structs.CSIVolume) *structs.CSIVolume {
Expand Down
4 changes: 4 additions & 0 deletions nomad/volumewatcher/volume_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestVolumeWatch_Reap(t *testing.T) {
logger: testlog.HCLogger(t),
}

vol, _ = srv.State().CSIVolumeDenormalize(nil, vol.Copy())
err := w.volumeReapImpl(vol)
require.NoError(err)

Expand All @@ -48,6 +49,7 @@ func TestVolumeWatch_Reap(t *testing.T) {
State: structs.CSIVolumeClaimStateNodeDetached,
},
}
vol, _ = srv.State().CSIVolumeDenormalize(nil, vol.Copy())
err = w.volumeReapImpl(vol)
require.NoError(err)
require.Len(vol.PastClaims, 1)
Expand All @@ -59,6 +61,7 @@ func TestVolumeWatch_Reap(t *testing.T) {
Mode: structs.CSIVolumeClaimGC,
},
}
vol, _ = srv.State().CSIVolumeDenormalize(nil, vol.Copy())
err = w.volumeReapImpl(vol)
require.NoError(err)
require.Len(vol.PastClaims, 2) // alloc claim + GC claim
Expand All @@ -71,6 +74,7 @@ func TestVolumeWatch_Reap(t *testing.T) {
Mode: structs.CSIVolumeClaimRead,
},
}
vol, _ = srv.State().CSIVolumeDenormalize(nil, vol.Copy())
err = w.volumeReapImpl(vol)
require.NoError(err)
require.Len(vol.PastClaims, 2) // alloc claim + GC claim
Expand Down
2 changes: 1 addition & 1 deletion nomad/volumewatcher/volumes_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
plugin := mock.CSIPlugin()
node := testNode(plugin, srv.State())
alloc := mock.Alloc()
alloc.ClientStatus = structs.AllocClientStatusComplete
alloc.ClientStatus = structs.AllocClientStatusRunning
vol := testVolume(plugin, alloc, node.ID)

index++
Expand Down

0 comments on commit 2c6de3e

Please sign in to comment.