Skip to content

Commit

Permalink
csi: move volume claim release into volumewatcher
Browse files Browse the repository at this point in the history
This changeset avoids tying up scheduling workers by immediately
sending volume claim release workloads into their own loop, rather
than blocking in the core GC job doing expensive things like talking
to CSI plugins.
  • Loading branch information
tgross committed Apr 21, 2020
1 parent dd2e387 commit 8f24510
Show file tree
Hide file tree
Showing 26 changed files with 1,535 additions and 524 deletions.
1 change: 1 addition & 0 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
req := &structs.CSIVolumeClaimRequest{
VolumeID: pair.request.Source,
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Claim: claimType,
}
req.Region = c.alloc.Job.Region
Expand Down
187 changes: 13 additions & 174 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (

log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
version "github.com/hashicorp/go-version"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
Expand Down Expand Up @@ -711,188 +709,29 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime time.Time,
return timeDiff > interval.Nanoseconds()
}

// TODO: we need a periodic trigger to iterate over all the volumes and split
// them up into separate work items, same as we do for jobs.

// csiVolumeClaimGC is used to garbage collect CSI volume claims
func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
c.logger.Trace("garbage collecting unclaimed CSI volume claims")

// Volume ID smuggled in with the eval's own JobID
evalVolID := strings.Split(eval.JobID, ":")
if len(evalVolID) != 3 {
c.logger.Error("volume gc called without volID")
return nil
}

volID := evalVolID[1]
runningAllocs := evalVolID[2] == "purge"
return volumeClaimReap(c.srv, volID, eval.Namespace,
c.srv.config.Region, eval.LeaderACL, runningAllocs)
}
// TODO: why are we getting spurious volume GC evals showing up
// and getting failures?

func volumeClaimReap(srv RPCServer, volID, namespace, region, leaderACL string, runningAllocs bool) error {

ws := memdb.NewWatchSet()

vol, err := srv.State().CSIVolumeByID(ws, namespace, volID)
if err != nil {
return err
}
if vol == nil {
// COMPAT(1.0): 0.11.0 shipped with 3 fields. tighten this check to len == 2
if len(evalVolID) < 2 {
c.logger.Error("volume gc called without volID")
return nil
}
vol, err = srv.State().CSIVolumeDenormalize(ws, vol)
if err != nil {
return err
}

plug, err := srv.State().CSIPluginByID(ws, vol.PluginID)
if err != nil {
return err
}

gcClaims, nodeClaims := collectClaimsToGCImpl(vol, runningAllocs)

var result *multierror.Error
for _, claim := range gcClaims {
nodeClaims, err = volumeClaimReapImpl(srv,
&volumeClaimReapArgs{
vol: vol,
plug: plug,
allocID: claim.allocID,
nodeID: claim.nodeID,
mode: claim.mode,
namespace: namespace,
region: region,
leaderACL: leaderACL,
nodeClaims: nodeClaims,
},
)
if err != nil {
result = multierror.Append(result, err)
continue
}
}
return result.ErrorOrNil()

}

type gcClaimRequest struct {
allocID string
nodeID string
mode structs.CSIVolumeClaimMode
}

func collectClaimsToGCImpl(vol *structs.CSIVolume, runningAllocs bool) ([]gcClaimRequest, map[string]int) {
gcAllocs := []gcClaimRequest{}
nodeClaims := map[string]int{} // node IDs -> count

collectFunc := func(allocs map[string]*structs.Allocation,
mode structs.CSIVolumeClaimMode) {
for _, alloc := range allocs {
// we call denormalize on the volume above to populate
// Allocation pointers. But the alloc might have been
// garbage collected concurrently, so if the alloc is
// still nil we can safely skip it.
if alloc == nil {
continue
}
nodeClaims[alloc.NodeID]++
if runningAllocs || alloc.Terminated() {
gcAllocs = append(gcAllocs, gcClaimRequest{
allocID: alloc.ID,
nodeID: alloc.NodeID,
mode: mode,
})
}
}
}

collectFunc(vol.WriteAllocs, structs.CSIVolumeClaimWrite)
collectFunc(vol.ReadAllocs, structs.CSIVolumeClaimRead)
return gcAllocs, nodeClaims
}

type volumeClaimReapArgs struct {
vol *structs.CSIVolume
plug *structs.CSIPlugin
allocID string
nodeID string
mode structs.CSIVolumeClaimMode
region string
namespace string
leaderACL string
nodeClaims map[string]int // node IDs -> count
}

func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]int, error) {
vol := args.vol
nodeID := args.nodeID

// (1) NodePublish / NodeUnstage must be completed before controller
// operations or releasing the claim.
nReq := &cstructs.ClientCSINodeDetachVolumeRequest{
PluginID: args.plug.ID,
VolumeID: vol.ID,
ExternalID: vol.RemoteID(),
AllocID: args.allocID,
NodeID: nodeID,
AttachmentMode: vol.AttachmentMode,
AccessMode: vol.AccessMode,
ReadOnly: args.mode == structs.CSIVolumeClaimRead,
}
err := srv.RPC("ClientCSI.NodeDetachVolume", nReq,
&cstructs.ClientCSINodeDetachVolumeResponse{})
if err != nil {
return args.nodeClaims, err
}
args.nodeClaims[nodeID]--

// (2) we only emit the controller unpublish if no other allocs
// on the node need it, but we also only want to make this
// call at most once per node
if vol.ControllerRequired && args.nodeClaims[nodeID] < 1 {

// we need to get the CSI Node ID, which is not the same as
// the Nomad Node ID
ws := memdb.NewWatchSet()
targetNode, err := srv.State().NodeByID(ws, nodeID)
if err != nil {
return args.nodeClaims, err
}
if targetNode == nil {
return args.nodeClaims, fmt.Errorf("%s: %s",
structs.ErrUnknownNodePrefix, nodeID)
}
targetCSIInfo, ok := targetNode.CSINodePlugins[args.plug.ID]
if !ok {
return args.nodeClaims, fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID)
}

cReq := &cstructs.ClientCSIControllerDetachVolumeRequest{
VolumeID: vol.RemoteID(),
ClientCSINodeID: targetCSIInfo.NodeInfo.ID,
}
cReq.PluginID = args.plug.ID
err = srv.RPC("ClientCSI.ControllerDetachVolume", cReq,
&cstructs.ClientCSIControllerDetachVolumeResponse{})
if err != nil {
return args.nodeClaims, err
}
}
volID := evalVolID[1]
req := &structs.CSIVolumeClaimRequest{VolumeID: volID}
req.Namespace = eval.Namespace
_, err := c.srv.volumeWatcher.Reap(req)
return err

// (3) release the claim from the state store, allowing it to be rescheduled
req := &structs.CSIVolumeClaimRequest{
VolumeID: vol.ID,
AllocationID: args.allocID,
Claim: structs.CSIVolumeClaimRelease,
WriteRequest: structs.WriteRequest{
Region: args.region,
Namespace: args.namespace,
AuthToken: args.leaderACL,
},
}
err = srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{})
if err != nil {
return args.nodeClaims, err
}
return args.nodeClaims, nil
}
Loading

0 comments on commit 8f24510

Please sign in to comment.