Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

csi: move volume claim release into volumewatcher #7708

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
req := &structs.CSIVolumeClaimRequest{
VolumeID: pair.request.Source,
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Claim: claimType,
}
req.Region = c.alloc.Job.Region
Expand Down
187 changes: 13 additions & 174 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (

log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
version "github.com/hashicorp/go-version"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
Expand Down Expand Up @@ -711,188 +709,29 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime time.Time,
return timeDiff > interval.Nanoseconds()
}

// TODO: we need a periodic trigger to iterate over all the volumes and split
// them up into separate work items, same as we do for jobs.

// csiVolumeClaimGC is used to garbage collect CSI volume claims
func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
c.logger.Trace("garbage collecting unclaimed CSI volume claims")

// Volume ID smuggled in with the eval's own JobID
evalVolID := strings.Split(eval.JobID, ":")
if len(evalVolID) != 3 {
c.logger.Error("volume gc called without volID")
return nil
}

volID := evalVolID[1]
runningAllocs := evalVolID[2] == "purge"
return volumeClaimReap(c.srv, volID, eval.Namespace,
c.srv.config.Region, eval.LeaderACL, runningAllocs)
}
// TODO: why are we getting spurious volume GC evals showing up
// and getting failures?

func volumeClaimReap(srv RPCServer, volID, namespace, region, leaderACL string, runningAllocs bool) error {

ws := memdb.NewWatchSet()

vol, err := srv.State().CSIVolumeByID(ws, namespace, volID)
if err != nil {
return err
}
if vol == nil {
// COMPAT(1.0): 0.11.0 shipped with 3 fields. tighten this check to len == 2
if len(evalVolID) < 2 {
c.logger.Error("volume gc called without volID")
return nil
}
vol, err = srv.State().CSIVolumeDenormalize(ws, vol)
if err != nil {
return err
}

plug, err := srv.State().CSIPluginByID(ws, vol.PluginID)
if err != nil {
return err
}

gcClaims, nodeClaims := collectClaimsToGCImpl(vol, runningAllocs)

var result *multierror.Error
for _, claim := range gcClaims {
nodeClaims, err = volumeClaimReapImpl(srv,
&volumeClaimReapArgs{
vol: vol,
plug: plug,
allocID: claim.allocID,
nodeID: claim.nodeID,
mode: claim.mode,
namespace: namespace,
region: region,
leaderACL: leaderACL,
nodeClaims: nodeClaims,
},
)
if err != nil {
result = multierror.Append(result, err)
continue
}
}
return result.ErrorOrNil()

}

type gcClaimRequest struct {
allocID string
nodeID string
mode structs.CSIVolumeClaimMode
}

func collectClaimsToGCImpl(vol *structs.CSIVolume, runningAllocs bool) ([]gcClaimRequest, map[string]int) {
gcAllocs := []gcClaimRequest{}
nodeClaims := map[string]int{} // node IDs -> count

collectFunc := func(allocs map[string]*structs.Allocation,
mode structs.CSIVolumeClaimMode) {
for _, alloc := range allocs {
// we call denormalize on the volume above to populate
// Allocation pointers. But the alloc might have been
// garbage collected concurrently, so if the alloc is
// still nil we can safely skip it.
if alloc == nil {
continue
}
nodeClaims[alloc.NodeID]++
if runningAllocs || alloc.Terminated() {
gcAllocs = append(gcAllocs, gcClaimRequest{
allocID: alloc.ID,
nodeID: alloc.NodeID,
mode: mode,
})
}
}
}

collectFunc(vol.WriteAllocs, structs.CSIVolumeClaimWrite)
collectFunc(vol.ReadAllocs, structs.CSIVolumeClaimRead)
return gcAllocs, nodeClaims
}

type volumeClaimReapArgs struct {
vol *structs.CSIVolume
plug *structs.CSIPlugin
allocID string
nodeID string
mode structs.CSIVolumeClaimMode
region string
namespace string
leaderACL string
nodeClaims map[string]int // node IDs -> count
}

func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]int, error) {
vol := args.vol
nodeID := args.nodeID

// (1) NodePublish / NodeUnstage must be completed before controller
// operations or releasing the claim.
nReq := &cstructs.ClientCSINodeDetachVolumeRequest{
PluginID: args.plug.ID,
VolumeID: vol.ID,
ExternalID: vol.RemoteID(),
AllocID: args.allocID,
NodeID: nodeID,
AttachmentMode: vol.AttachmentMode,
AccessMode: vol.AccessMode,
ReadOnly: args.mode == structs.CSIVolumeClaimRead,
}
err := srv.RPC("ClientCSI.NodeDetachVolume", nReq,
&cstructs.ClientCSINodeDetachVolumeResponse{})
if err != nil {
return args.nodeClaims, err
}
args.nodeClaims[nodeID]--

// (2) we only emit the controller unpublish if no other allocs
// on the node need it, but we also only want to make this
// call at most once per node
if vol.ControllerRequired && args.nodeClaims[nodeID] < 1 {

// we need to get the CSI Node ID, which is not the same as
// the Nomad Node ID
ws := memdb.NewWatchSet()
targetNode, err := srv.State().NodeByID(ws, nodeID)
if err != nil {
return args.nodeClaims, err
}
if targetNode == nil {
return args.nodeClaims, fmt.Errorf("%s: %s",
structs.ErrUnknownNodePrefix, nodeID)
}
targetCSIInfo, ok := targetNode.CSINodePlugins[args.plug.ID]
if !ok {
return args.nodeClaims, fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID)
}

cReq := &cstructs.ClientCSIControllerDetachVolumeRequest{
VolumeID: vol.RemoteID(),
ClientCSINodeID: targetCSIInfo.NodeInfo.ID,
}
cReq.PluginID = args.plug.ID
err = srv.RPC("ClientCSI.ControllerDetachVolume", cReq,
&cstructs.ClientCSIControllerDetachVolumeResponse{})
if err != nil {
return args.nodeClaims, err
}
}
volID := evalVolID[1]
req := &structs.CSIVolumeClaimRequest{VolumeID: volID}
req.Namespace = eval.Namespace
_, err := c.srv.volumeWatcher.Reap(req)
return err

// (3) release the claim from the state store, allowing it to be rescheduled
req := &structs.CSIVolumeClaimRequest{
VolumeID: vol.ID,
AllocationID: args.allocID,
Claim: structs.CSIVolumeClaimRelease,
WriteRequest: structs.WriteRequest{
Region: args.region,
Namespace: args.namespace,
AuthToken: args.leaderACL,
},
}
err = srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{})
if err != nil {
return args.nodeClaims, err
}
return args.nodeClaims, nil
}
Loading