Skip to content

Commit

Permalink
csi: checkpoint volume claim garbage collection
Browse files Browse the repository at this point in the history
Adds a `CSIVolumeClaim` type to be tracked as current and past claims
on a volume. Allows for a client RPC failure during node or controller
detachment without having to keep the allocation around after the
first garbage collection eval.

This changeset lays groundwork for moving the actual detachment RPCs
into a volume watching loop outside the GC eval.
  • Loading branch information
tgross committed Apr 23, 2020
1 parent 77c7ed3 commit d7f1a5e
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 130 deletions.
1 change: 1 addition & 0 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
req := &structs.CSIVolumeClaimRequest{
VolumeID: pair.request.Source,
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Claim: claimType,
}
req.Region = c.alloc.Job.Region
Expand Down
2 changes: 0 additions & 2 deletions client/pluginmanager/csimanager/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package csimanager

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -47,7 +46,6 @@ func TestInstanceManager_Shutdown(t *testing.T) {
im.shutdownCtxCancelFn = cancelFn
im.shutdownCh = make(chan struct{})
im.updater = func(_ string, info *structs.CSIInfo) {
fmt.Println(info)
lock.Lock()
defer lock.Unlock()
pluginHealth = info.Healthy
Expand Down
132 changes: 78 additions & 54 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,17 +749,15 @@ func volumeClaimReap(srv RPCServer, volID, namespace, region, leaderACL string,
return err
}

gcClaims, nodeClaims := collectClaimsToGCImpl(vol, runningAllocs)
nodeClaims := collectClaimsToGCImpl(vol, runningAllocs)

var result *multierror.Error
for _, claim := range gcClaims {
for _, claim := range vol.PastClaims {
nodeClaims, err = volumeClaimReapImpl(srv,
&volumeClaimReapArgs{
vol: vol,
plug: plug,
allocID: claim.allocID,
nodeID: claim.nodeID,
mode: claim.mode,
claim: claim,
namespace: namespace,
region: region,
leaderACL: leaderACL,
Expand All @@ -775,48 +773,47 @@ func volumeClaimReap(srv RPCServer, volID, namespace, region, leaderACL string,

}

type gcClaimRequest struct {
allocID string
nodeID string
mode structs.CSIVolumeClaimMode
}

func collectClaimsToGCImpl(vol *structs.CSIVolume, runningAllocs bool) ([]gcClaimRequest, map[string]int) {
gcAllocs := []gcClaimRequest{}
func collectClaimsToGCImpl(vol *structs.CSIVolume, runningAllocs bool) map[string]int {
nodeClaims := map[string]int{} // node IDs -> count

collectFunc := func(allocs map[string]*structs.Allocation,
mode structs.CSIVolumeClaimMode) {
for _, alloc := range allocs {
// we call denormalize on the volume above to populate
// Allocation pointers. But the alloc might have been
// garbage collected concurrently, so if the alloc is
// still nil we can safely skip it.
if alloc == nil {
continue
claims map[string]*structs.CSIVolumeClaim) {

for allocID, alloc := range allocs {
claim, ok := claims[allocID]
if !ok {
// COMPAT(1.0): the CSIVolumeClaim fields were added
// after 0.11.1, so claims made before that may be
// missing this value. note that we'll have non-nil
// allocs here because we called denormalize on the
// value.
claim = &structs.CSIVolumeClaim{
AllocationID: allocID,
NodeID: alloc.NodeID,
State: structs.CSIVolumeClaimStateTaken,
}
}
nodeClaims[alloc.NodeID]++
nodeClaims[claim.NodeID]++
if runningAllocs || alloc.Terminated() {
gcAllocs = append(gcAllocs, gcClaimRequest{
allocID: alloc.ID,
nodeID: alloc.NodeID,
mode: mode,
})
// only overwrite the PastClaim if this is new,
// so that we can track state between subsequent calls
if _, exists := vol.PastClaims[claim.AllocationID]; !exists {
claim.State = structs.CSIVolumeClaimStateTaken
vol.PastClaims[claim.AllocationID] = claim
}
}
}
}

collectFunc(vol.WriteAllocs, structs.CSIVolumeClaimWrite)
collectFunc(vol.ReadAllocs, structs.CSIVolumeClaimRead)
return gcAllocs, nodeClaims
collectFunc(vol.WriteAllocs, vol.WriteClaims)
collectFunc(vol.ReadAllocs, vol.ReadClaims)
return nodeClaims
}

type volumeClaimReapArgs struct {
vol *structs.CSIVolume
plug *structs.CSIPlugin
allocID string
nodeID string
mode structs.CSIVolumeClaimMode
claim *structs.CSIVolumeClaim
region string
namespace string
leaderACL string
Expand All @@ -825,42 +822,78 @@ type volumeClaimReapArgs struct {

func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]int, error) {
vol := args.vol
nodeID := args.nodeID
claim := args.claim

var err error
var nReq *cstructs.ClientCSINodeDetachVolumeRequest

checkpoint := func(claimState structs.CSIVolumeClaimState) error {
req := &structs.CSIVolumeClaimRequest{
VolumeID: vol.ID,
AllocationID: claim.AllocationID,
Claim: structs.CSIVolumeClaimRelease,
WriteRequest: structs.WriteRequest{
Region: args.region,
Namespace: args.namespace,
AuthToken: args.leaderACL,
},
}
return srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{})
}

// previous checkpoints may have set the past claim state already.
// in practice we should never see CSIVolumeClaimStateControllerDetached
// but having an option for the state makes it easy to add a checkpoint
// in a backwards compatible way if we need one later
switch claim.State {
case structs.CSIVolumeClaimStateNodeDetached:
goto NODE_DETACHED
case structs.CSIVolumeClaimStateControllerDetached:
goto RELEASE_CLAIM
case structs.CSIVolumeClaimStateReadyToFree:
goto RELEASE_CLAIM
}

// (1) NodePublish / NodeUnstage must be completed before controller
// operations or releasing the claim.
nReq := &cstructs.ClientCSINodeDetachVolumeRequest{
nReq = &cstructs.ClientCSINodeDetachVolumeRequest{
PluginID: args.plug.ID,
VolumeID: vol.ID,
ExternalID: vol.RemoteID(),
AllocID: args.allocID,
NodeID: nodeID,
AllocID: claim.AllocationID,
NodeID: claim.NodeID,
AttachmentMode: vol.AttachmentMode,
AccessMode: vol.AccessMode,
ReadOnly: args.mode == structs.CSIVolumeClaimRead,
ReadOnly: claim.Mode == structs.CSIVolumeClaimRead,
}
err := srv.RPC("ClientCSI.NodeDetachVolume", nReq,
err = srv.RPC("ClientCSI.NodeDetachVolume", nReq,
&cstructs.ClientCSINodeDetachVolumeResponse{})
if err != nil {
return args.nodeClaims, err
}
args.nodeClaims[nodeID]--
err = checkpoint(structs.CSIVolumeClaimStateNodeDetached)
if err != nil {
return args.nodeClaims, err
}

NODE_DETACHED:
args.nodeClaims[claim.NodeID]--

// (2) we only emit the controller unpublish if no other allocs
// on the node need it, but we also only want to make this
// call at most once per node
if vol.ControllerRequired && args.nodeClaims[nodeID] < 1 {
if vol.ControllerRequired && args.nodeClaims[claim.NodeID] < 1 {

// we need to get the CSI Node ID, which is not the same as
// the Nomad Node ID
ws := memdb.NewWatchSet()
targetNode, err := srv.State().NodeByID(ws, nodeID)
targetNode, err := srv.State().NodeByID(ws, claim.NodeID)
if err != nil {
return args.nodeClaims, err
}
if targetNode == nil {
return args.nodeClaims, fmt.Errorf("%s: %s",
structs.ErrUnknownNodePrefix, nodeID)
structs.ErrUnknownNodePrefix, claim.NodeID)
}
targetCSIInfo, ok := targetNode.CSINodePlugins[args.plug.ID]
if !ok {
Expand All @@ -879,18 +912,9 @@ func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]i
}
}

RELEASE_CLAIM:
// (3) release the claim from the state store, allowing it to be rescheduled
req := &structs.CSIVolumeClaimRequest{
VolumeID: vol.ID,
AllocationID: args.allocID,
Claim: structs.CSIVolumeClaimRelease,
WriteRequest: structs.WriteRequest{
Region: args.region,
Namespace: args.namespace,
AuthToken: args.leaderACL,
},
}
err = srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{})
err = checkpoint(structs.CSIVolumeClaimStateReadyToFree)
if err != nil {
return args.nodeClaims, err
}
Expand Down
62 changes: 30 additions & 32 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2286,12 +2286,22 @@ func TestCSI_GCVolumeClaims_Collection(t *testing.T) {
require.NoError(t, err)

// Claim the volumes and verify the claims were set
err = state.CSIVolumeClaim(index, ns, volId0, alloc1, structs.CSIVolumeClaimWrite)
err = state.CSIVolumeClaim(index, ns, volId0, &structs.CSIVolumeClaim{
AllocationID: alloc1.ID,
NodeID: alloc1.NodeID,
Mode: structs.CSIVolumeClaimWrite,
})
index++
require.NoError(t, err)
err = state.CSIVolumeClaim(index, ns, volId0, alloc2, structs.CSIVolumeClaimRead)

err = state.CSIVolumeClaim(index, ns, volId0, &structs.CSIVolumeClaim{
AllocationID: alloc2.ID,
NodeID: alloc2.NodeID,
Mode: structs.CSIVolumeClaimRead,
})
index++
require.NoError(t, err)

vol, err = state.CSIVolumeByID(ws, ns, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 1)
Expand All @@ -2306,9 +2316,9 @@ func TestCSI_GCVolumeClaims_Collection(t *testing.T) {
vol, err = state.CSIVolumeDenormalize(ws, vol)
require.NoError(t, err)

gcClaims, nodeClaims := collectClaimsToGCImpl(vol, false)
nodeClaims := collectClaimsToGCImpl(vol, false)
require.Equal(t, nodeClaims[node.ID], 2)
require.Len(t, gcClaims, 2)
require.Len(t, vol.PastClaims, 2)
}

func TestCSI_GCVolumeClaims_Reap(t *testing.T) {
Expand All @@ -2326,7 +2336,6 @@ func TestCSI_GCVolumeClaims_Reap(t *testing.T) {

cases := []struct {
Name string
Claim gcClaimRequest
ClaimsCount map[string]int
ControllerRequired bool
ExpectedErr string
Expand All @@ -2338,12 +2347,7 @@ func TestCSI_GCVolumeClaims_Reap(t *testing.T) {
srv *MockRPCServer
}{
{
Name: "NodeDetachVolume fails",
Claim: gcClaimRequest{
allocID: alloc.ID,
nodeID: node.ID,
mode: structs.CSIVolumeClaimRead,
},
Name: "NodeDetachVolume fails",
ClaimsCount: map[string]int{node.ID: 1},
ControllerRequired: true,
ExpectedErr: "node plugin missing",
Expand All @@ -2355,36 +2359,26 @@ func TestCSI_GCVolumeClaims_Reap(t *testing.T) {
},
},
{
Name: "ControllerDetachVolume no controllers",
Claim: gcClaimRequest{
allocID: alloc.ID,
nodeID: node.ID,
mode: structs.CSIVolumeClaimRead,
},
ClaimsCount: map[string]int{node.ID: 1},
ControllerRequired: true,
ExpectedErr: fmt.Sprintf(
"Unknown node: %s", node.ID),
Name: "ControllerDetachVolume no controllers",
ClaimsCount: map[string]int{node.ID: 1},
ControllerRequired: true,
ExpectedErr: fmt.Sprintf("Unknown node: %s", node.ID),
ExpectedClaimsCount: 0,
ExpectedNodeDetachVolumeCount: 1,
ExpectedControllerDetachVolumeCount: 0,
ExpectedVolumeClaimCount: 1,
srv: &MockRPCServer{
state: s.State(),
},
},
{
Name: "ControllerDetachVolume node-only",
Claim: gcClaimRequest{
allocID: alloc.ID,
nodeID: node.ID,
mode: structs.CSIVolumeClaimRead,
},
Name: "ControllerDetachVolume node-only",
ClaimsCount: map[string]int{node.ID: 1},
ControllerRequired: false,
ExpectedClaimsCount: 0,
ExpectedNodeDetachVolumeCount: 1,
ExpectedControllerDetachVolumeCount: 0,
ExpectedVolumeClaimCount: 1,
ExpectedVolumeClaimCount: 2,
srv: &MockRPCServer{
state: s.State(),
},
Expand All @@ -2394,12 +2388,16 @@ func TestCSI_GCVolumeClaims_Reap(t *testing.T) {
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
vol.ControllerRequired = tc.ControllerRequired
claim := &structs.CSIVolumeClaim{
AllocationID: alloc.ID,
NodeID: node.ID,
State: structs.CSIVolumeClaimStateTaken,
Mode: structs.CSIVolumeClaimRead,
}
nodeClaims, err := volumeClaimReapImpl(tc.srv, &volumeClaimReapArgs{
vol: vol,
plug: plugin,
allocID: tc.Claim.allocID,
nodeID: tc.Claim.nodeID,
mode: tc.Claim.mode,
claim: claim,
region: "global",
namespace: "default",
leaderACL: "not-in-use",
Expand All @@ -2411,7 +2409,7 @@ func TestCSI_GCVolumeClaims_Reap(t *testing.T) {
require.NoError(err)
}
require.Equal(tc.ExpectedClaimsCount,
nodeClaims[tc.Claim.nodeID], "expected claims")
nodeClaims[claim.NodeID], "expected claims remaining")
require.Equal(tc.ExpectedNodeDetachVolumeCount,
tc.srv.countCSINodeDetachVolume, "node detach RPC count")
require.Equal(tc.ExpectedControllerDetachVolumeCount,
Expand Down
7 changes: 6 additions & 1 deletion nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,22 +400,27 @@ func (v *CSIVolume) controllerPublishVolume(req *structs.CSIVolumeClaimRequest,
return nil
}

// get Nomad's ID for the client node (not the storage provider's ID)
targetNode, err := state.NodeByID(ws, alloc.NodeID)
if err != nil {
return err
}
if targetNode == nil {
return fmt.Errorf("%s: %s", structs.ErrUnknownNodePrefix, alloc.NodeID)
}

// get the the storage provider's ID for the client node (not
// Nomad's ID for the node)
targetCSIInfo, ok := targetNode.CSINodePlugins[plug.ID]
if !ok {
return fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID)
}
externalNodeID := targetCSIInfo.NodeInfo.ID

method := "ClientCSI.ControllerAttachVolume"
cReq := &cstructs.ClientCSIControllerAttachVolumeRequest{
VolumeID: vol.RemoteID(),
ClientCSINodeID: targetCSIInfo.NodeInfo.ID,
ClientCSINodeID: externalNodeID,
AttachmentMode: vol.AttachmentMode,
AccessMode: vol.AccessMode,
ReadOnly: req.Claim == structs.CSIVolumeClaimRead,
Expand Down
Loading

0 comments on commit d7f1a5e

Please sign in to comment.