Skip to content

Commit

Permalink
csi: move volume claim release into volumewatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed Apr 27, 2020
1 parent 8af65c5 commit 75ed335
Show file tree
Hide file tree
Showing 24 changed files with 1,789 additions and 523 deletions.
10 changes: 3 additions & 7 deletions e2e/csi/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,11 @@ func (tc *CSIVolumesTest) TestEBSVolumeClaim(f *framework.F) {
_, err = readFile(nomadClient, writeAlloc, expectedPath)
require.NoError(err)

// Shutdown the writer so we can run a reader.
// Shutdown (and purge) the writer so we can run a reader.
// we could mount the EBS volume with multi-attach, but we
// want this test to exercise the unpublish workflow.
//
// TODO(tgross): we should pass true here to run the equivalent
// of 'nomad job stop -purge' but this makes the test really
// racy. Once the unmount hang problem with -purge is fixed,
// we can restore this.
nomadClient.Jobs().Deregister(writeJobID, false, nil)
// this runs the equivalent of 'nomad job stop -purge'
nomadClient.Jobs().Deregister(writeJobID, true, nil)
// instead of waiting for the alloc to stop, wait for the volume claim gc run
require.Eventuallyf(func() bool {
vol, _, err := nomadClient.CSIVolumes().Info(volID, nil)
Expand Down
210 changes: 13 additions & 197 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (

log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
version "github.com/hashicorp/go-version"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
Expand Down Expand Up @@ -711,212 +709,30 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime time.Time,
return timeDiff > interval.Nanoseconds()
}

// TODO: we need a periodic trigger to iterate over all the volumes and split
// them up into separate work items, same as we do for jobs.

// csiVolumeClaimGC is used to garbage collect CSI volume claims
func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
c.logger.Trace("garbage collecting unclaimed CSI volume claims")

// Volume ID smuggled in with the eval's own JobID
evalVolID := strings.Split(eval.JobID, ":")
if len(evalVolID) != 3 {

// COMPAT(1.0): 0.11.0 shipped with 3 fields. tighten this check to len == 2
if len(evalVolID) < 2 {
c.logger.Error("volume gc called without volID")
return nil
}

volID := evalVolID[1]
runningAllocs := evalVolID[2] == "purge"
return volumeClaimReap(c.srv, volID, eval.Namespace,
c.srv.config.Region, eval.LeaderACL, runningAllocs)
}

func volumeClaimReap(srv RPCServer, volID, namespace, region, leaderACL string, runningAllocs bool) error {

ws := memdb.NewWatchSet()

vol, err := srv.State().CSIVolumeByID(ws, namespace, volID)
if err != nil {
return err
}
if vol == nil {
return nil
}
vol, err = srv.State().CSIVolumeDenormalize(ws, vol)
if err != nil {
return err
}

plug, err := srv.State().CSIPluginByID(ws, vol.PluginID)
if err != nil {
return err
}

nodeClaims := collectClaimsToGCImpl(vol, runningAllocs)

var result *multierror.Error
for _, claim := range vol.PastClaims {
nodeClaims, err = volumeClaimReapImpl(srv,
&volumeClaimReapArgs{
vol: vol,
plug: plug,
claim: claim,
namespace: namespace,
region: region,
leaderACL: leaderACL,
nodeClaims: nodeClaims,
},
)
if err != nil {
result = multierror.Append(result, err)
continue
}
req := &structs.CSIVolumeClaimRequest{
VolumeID: volID,
Claim: structs.CSIVolumeClaimRelease,
}
return result.ErrorOrNil()

}
req.Namespace = eval.Namespace
req.Region = c.srv.config.Region

func collectClaimsToGCImpl(vol *structs.CSIVolume, runningAllocs bool) map[string]int {
nodeClaims := map[string]int{} // node IDs -> count

collectFunc := func(allocs map[string]*structs.Allocation,
claims map[string]*structs.CSIVolumeClaim) {

for allocID, alloc := range allocs {
claim, ok := claims[allocID]
if !ok {
// COMPAT(1.0): the CSIVolumeClaim fields were added
// after 0.11.1, so claims made before that may be
// missing this value. note that we'll have non-nil
// allocs here because we called denormalize on the
// value.
claim = &structs.CSIVolumeClaim{
AllocationID: allocID,
NodeID: alloc.NodeID,
State: structs.CSIVolumeClaimStateTaken,
}
}
nodeClaims[claim.NodeID]++
if runningAllocs || alloc.Terminated() {
// only overwrite the PastClaim if this is new,
// so that we can track state between subsequent calls
if _, exists := vol.PastClaims[claim.AllocationID]; !exists {
claim.State = structs.CSIVolumeClaimStateTaken
vol.PastClaims[claim.AllocationID] = claim
}
}
}
}

collectFunc(vol.WriteAllocs, vol.WriteClaims)
collectFunc(vol.ReadAllocs, vol.ReadClaims)
return nodeClaims
}

type volumeClaimReapArgs struct {
vol *structs.CSIVolume
plug *structs.CSIPlugin
claim *structs.CSIVolumeClaim
region string
namespace string
leaderACL string
nodeClaims map[string]int // node IDs -> count
}

func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]int, error) {
vol := args.vol
claim := args.claim

var err error
var nReq *cstructs.ClientCSINodeDetachVolumeRequest

checkpoint := func(claimState structs.CSIVolumeClaimState) error {
req := &structs.CSIVolumeClaimRequest{
VolumeID: vol.ID,
AllocationID: claim.AllocationID,
Claim: structs.CSIVolumeClaimRelease,
WriteRequest: structs.WriteRequest{
Region: args.region,
Namespace: args.namespace,
AuthToken: args.leaderACL,
},
}
return srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{})
}

// previous checkpoints may have set the past claim state already.
// in practice we should never see CSIVolumeClaimStateControllerDetached
// but having an option for the state makes it easy to add a checkpoint
// in a backwards compatible way if we need one later
switch claim.State {
case structs.CSIVolumeClaimStateNodeDetached:
goto NODE_DETACHED
case structs.CSIVolumeClaimStateControllerDetached:
goto RELEASE_CLAIM
case structs.CSIVolumeClaimStateReadyToFree:
goto RELEASE_CLAIM
}

// (1) NodePublish / NodeUnstage must be completed before controller
// operations or releasing the claim.
nReq = &cstructs.ClientCSINodeDetachVolumeRequest{
PluginID: args.plug.ID,
VolumeID: vol.ID,
ExternalID: vol.RemoteID(),
AllocID: claim.AllocationID,
NodeID: claim.NodeID,
AttachmentMode: vol.AttachmentMode,
AccessMode: vol.AccessMode,
ReadOnly: claim.Mode == structs.CSIVolumeClaimRead,
}
err = srv.RPC("ClientCSI.NodeDetachVolume", nReq,
&cstructs.ClientCSINodeDetachVolumeResponse{})
if err != nil {
return args.nodeClaims, err
}
err = checkpoint(structs.CSIVolumeClaimStateNodeDetached)
if err != nil {
return args.nodeClaims, err
}

NODE_DETACHED:
args.nodeClaims[claim.NodeID]--

// (2) we only emit the controller unpublish if no other allocs
// on the node need it, but we also only want to make this
// call at most once per node
if vol.ControllerRequired && args.nodeClaims[claim.NodeID] < 1 {

// we need to get the CSI Node ID, which is not the same as
// the Nomad Node ID
ws := memdb.NewWatchSet()
targetNode, err := srv.State().NodeByID(ws, claim.NodeID)
if err != nil {
return args.nodeClaims, err
}
if targetNode == nil {
return args.nodeClaims, fmt.Errorf("%s: %s",
structs.ErrUnknownNodePrefix, claim.NodeID)
}
targetCSIInfo, ok := targetNode.CSINodePlugins[args.plug.ID]
if !ok {
return args.nodeClaims, fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID)
}

cReq := &cstructs.ClientCSIControllerDetachVolumeRequest{
VolumeID: vol.RemoteID(),
ClientCSINodeID: targetCSIInfo.NodeInfo.ID,
}
cReq.PluginID = args.plug.ID
err = srv.RPC("ClientCSI.ControllerDetachVolume", cReq,
&cstructs.ClientCSIControllerDetachVolumeResponse{})
if err != nil {
return args.nodeClaims, err
}
}

RELEASE_CLAIM:
// (3) release the claim from the state store, allowing it to be rescheduled
err = checkpoint(structs.CSIVolumeClaimStateReadyToFree)
if err != nil {
return args.nodeClaims, err
}
return args.nodeClaims, nil
err := c.srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{})
return err
}
Loading

0 comments on commit 75ed335

Please sign in to comment.