Skip to content

Commit

Permalink
csi: add unpublish RPC (#8572)
Browse files Browse the repository at this point in the history
This changeset is plumbing for a `nomad volume detach` command that will be
reused by the volumewatcher claim GC as well.
  • Loading branch information
tgross authored Aug 6, 2020
1 parent 07ff0b9 commit acc1c0b
Show file tree
Hide file tree
Showing 3 changed files with 350 additions and 14 deletions.
225 changes: 224 additions & 1 deletion nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -392,6 +393,7 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
return fmt.Errorf("controller publish: %v", err)
}
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "claim")
Expand Down Expand Up @@ -448,9 +450,10 @@ func (v *CSIVolume) controllerPublishVolume(req *structs.CSIVolumeClaimRequest,
// Nomad's ID for the node)
targetCSIInfo, ok := targetNode.CSINodePlugins[plug.ID]
if !ok {
return fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID)
return fmt.Errorf("failed to find storage provider info for client %q, node plugin %q is not running or has not fingerprinted on this client", targetNode.ID, plug.ID)
}
externalNodeID := targetCSIInfo.NodeInfo.ID
req.ExternalNodeID = externalNodeID // update with the target info

method := "ClientCSI.ControllerAttachVolume"
cReq := &cstructs.ClientCSIControllerAttachVolumeRequest{
Expand Down Expand Up @@ -507,6 +510,226 @@ func allowCSIMount(aclObj *acl.ACL, namespace string) bool {
aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityCSIMountVolume)
}

// Unpublish synchronously sends the NodeUnpublish, NodeUnstage, and
// ControllerUnpublish RPCs to the client. It handles errors according to the
// current claim state.
func (v *CSIVolume) Unpublish(args *structs.CSIVolumeUnpublishRequest, reply *structs.CSIVolumeUnpublishResponse) error {
if done, err := v.srv.forward("CSIVolume.Unpublish", args, args, reply); done {
return err
}

metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "volume", "unpublish"}, metricsStart)

// TODO(tgross): ensure we have pass-thru of token for client-driven RPC
// ref https://github.com/hashicorp/nomad/issues/8373
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIMountVolume)
aclObj, err := v.srv.WriteACLObj(&args.WriteRequest, true)
if err != nil {
return err
}
if !allowVolume(aclObj, args.RequestNamespace()) || !aclObj.AllowPluginRead() {
return structs.ErrPermissionDenied
}

if args.VolumeID == "" {
return fmt.Errorf("missing volume ID")
}
if args.Claim == nil {
return fmt.Errorf("missing volume claim")
}

ws := memdb.NewWatchSet()
state := v.srv.fsm.State()
vol, err := state.CSIVolumeByID(ws, args.Namespace, args.VolumeID)
if err != nil {
return err
}
if vol == nil {
return fmt.Errorf("no such volume")
}

claim := args.Claim
claim.Mode = structs.CSIVolumeClaimRelease

// we send a controller detach if a Nomad client no longer has
// any claim to the volume, so track the counts here
var nodeClaims int
for _, alloc := range vol.ReadAllocs {
if alloc != nil && alloc.NodeID == claim.NodeID {
nodeClaims++
}
}
for _, alloc := range vol.WriteAllocs {
if alloc != nil && alloc.NodeID == claim.NodeID {
nodeClaims++
}
}

// previous checkpoints may have set the past claim state already.
// in practice we should never see CSIVolumeClaimStateControllerDetached
// but having an option for the state makes it easy to add a checkpoint
// in a backwards compatible way if we need one later
switch claim.State {
case structs.CSIVolumeClaimStateNodeDetached:
goto NODE_DETACHED
case structs.CSIVolumeClaimStateControllerDetached:
goto RELEASE_CLAIM
case structs.CSIVolumeClaimStateReadyToFree:
goto RELEASE_CLAIM
}
err = v.nodeUnpublishVolume(vol, claim)
if err != nil {
return err
}

NODE_DETACHED:
nodeClaims--
err = v.controllerUnpublishVolume(vol, claim, nodeClaims)
if err != nil {
return err
}

RELEASE_CLAIM:
// advance a CSIVolumeClaimStateControllerDetached claim
claim.State = structs.CSIVolumeClaimStateReadyToFree
err = v.checkpointClaim(vol, claim)
if err != nil {
return err
}

reply.Index = vol.ModifyIndex
v.srv.setQueryMeta(&reply.QueryMeta)
return nil
}

func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
req := &cstructs.ClientCSINodeDetachVolumeRequest{
PluginID: vol.PluginID,
VolumeID: vol.ID,
ExternalID: vol.RemoteID(),
AllocID: claim.AllocationID,
NodeID: claim.NodeID,
AttachmentMode: vol.AttachmentMode,
AccessMode: vol.AccessMode,
ReadOnly: claim.Mode == structs.CSIVolumeClaimRead,
}
err := v.srv.RPC("ClientCSI.NodeDetachVolume",
req, &cstructs.ClientCSINodeDetachVolumeResponse{})
if err != nil {
// we should only get this error if the Nomad node disconnects and
// is garbage-collected, so at this point we don't have any reason
// to operate as though the volume is attached to it.
if !errors.Is(err, fmt.Errorf("Unknown node: %s", claim.NodeID)) {
// TODO(tgross): need to capture case where NodeUnpublish previously
// happened but we failed to checkpoint for some reason
return fmt.Errorf("could not detach from node: %w", err)
}
}
claim.State = structs.CSIVolumeClaimStateNodeDetached
return v.checkpointClaim(vol, claim)
}

func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim, nodeClaims int) error {

// we can drop the claim without sending the controller detach if
// another node has a claim on the volume
if !vol.ControllerRequired || nodeClaims >= 1 {
claim.State = structs.CSIVolumeClaimStateReadyToFree
return nil
}

// if the RPC is sent by a client node, it doesn't know the claim's
// external node ID.
if claim.ExternalNodeID == "" {
externalNodeID, err := v.lookupExternalNodeID(vol, claim)
if err != nil {
return fmt.Errorf("missing external node ID: %v", err)
}
claim.ExternalNodeID = externalNodeID
}

req := &cstructs.ClientCSIControllerDetachVolumeRequest{
VolumeID: vol.RemoteID(),
ClientCSINodeID: claim.ExternalNodeID,
Secrets: vol.Secrets,
}
req.PluginID = vol.PluginID
err := v.srv.RPC("ClientCSI.ControllerDetachVolume", req,
&cstructs.ClientCSIControllerDetachVolumeResponse{})
if err != nil {
// TODO(tgross): need to capture case where ControllerUnpublish previously
// happened but we failed to checkpoint for some reason
return fmt.Errorf("could not detach from controller: %v", err)
}
claim.State = structs.CSIVolumeClaimStateReadyToFree
return v.checkpointClaim(vol, claim)
}

// lookupExternalNodeID gets the CSI plugin's ID for a node. we look it up in
// the volume's claims first because it's possible the client has been stopped
// and GC'd by this point, so looking there is the last resort.
func (v *CSIVolume) lookupExternalNodeID(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) (string, error) {
for _, rClaim := range vol.ReadClaims {
if rClaim.NodeID == claim.NodeID {
return rClaim.ExternalNodeID, nil
}
}
for _, wClaim := range vol.WriteClaims {
if wClaim.NodeID == claim.NodeID {
return wClaim.ExternalNodeID, nil
}
}
for _, pClaim := range vol.PastClaims {
if pClaim.NodeID == claim.NodeID {
return pClaim.ExternalNodeID, nil
}
}

// fallback to looking up the node plugin
ws := memdb.NewWatchSet()
state := v.srv.fsm.State()
targetNode, err := state.NodeByID(ws, claim.NodeID)
if err != nil {
return "", err
}
if targetNode == nil {
return "", fmt.Errorf("%s: %s", structs.ErrUnknownNodePrefix, claim.NodeID)
}

// get the the storage provider's ID for the client node (not
// Nomad's ID for the node)
targetCSIInfo, ok := targetNode.CSINodePlugins[vol.PluginID]
if !ok {
return "", fmt.Errorf("failed to find storage provider info for client %q, node plugin %q is not running or has not fingerprinted on this client", targetNode.ID, vol.PluginID)
}
return targetCSIInfo.NodeInfo.ID, nil
}

func (v *CSIVolume) checkpointClaim(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
v.logger.Trace("checkpointing claim")
req := structs.CSIVolumeClaimRequest{
VolumeID: vol.ID,
AllocationID: claim.AllocationID,
NodeID: claim.NodeID,
Claim: claim.Mode,
State: claim.State,
WriteRequest: structs.WriteRequest{
Namespace: vol.Namespace,
},
}
resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, req)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
vol.ModifyIndex = index
return nil
}

// CSIPlugin wraps the structs.CSIPlugin with request data and server context
type CSIPlugin struct {
srv *Server
Expand Down
100 changes: 100 additions & 0 deletions nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nomad

import (
"fmt"
"strings"
"testing"

msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
Expand Down Expand Up @@ -405,6 +406,105 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) {
require.EqualError(t, err, "controller publish: attach volume: No path to node")
}

func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
t.Parallel()
srv, shutdown := TestServer(t, func(c *Config) { c.NumSchedulers = 0 })
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)

var err error
index := uint64(1000)
ns := structs.DefaultNamespace
state := srv.fsm.State()
state.BootstrapACLTokens(1, 0, mock.ACLManagementToken())

policy := mock.NamespacePolicy(ns, "", []string{acl.NamespaceCapabilityCSIMountVolume}) +
mock.PluginPolicy("read")
index++
accessToken := mock.CreatePolicyAndToken(t, state, index, "claim", policy)

codec := rpcClient(t, srv)

type tc struct {
name string
startingState structs.CSIVolumeClaimState
hasController bool
expectedErrMsg string
}

testCases := []tc{
{
name: "no path to node plugin",
startingState: structs.CSIVolumeClaimStateTaken,
hasController: true,
expectedErrMsg: "could not detach from node: Unknown node ",
},
{
name: "no registered controller plugin",
startingState: structs.CSIVolumeClaimStateNodeDetached,
hasController: true,
expectedErrMsg: "could not detach from controller: controller detach volume: plugin missing: minnie",
},
{
name: "success",
startingState: structs.CSIVolumeClaimStateControllerDetached,
hasController: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

volID := uuid.Generate()
nodeID := uuid.Generate()
allocID := uuid.Generate()

claim := &structs.CSIVolumeClaim{
AllocationID: allocID,
NodeID: nodeID,
ExternalNodeID: "i-example",
Mode: structs.CSIVolumeClaimRead,
State: tc.startingState,
}

vol := &structs.CSIVolume{
ID: volID,
Namespace: ns,
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
PluginID: "minnie",
Secrets: structs.CSISecrets{"mysecret": "secretvalue"},
ControllerRequired: tc.hasController,
}

index++
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol})
require.NoError(t, err)

req := &structs.CSIVolumeUnpublishRequest{
VolumeID: volID,
Claim: claim,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: ns,
AuthToken: accessToken.SecretID,
},
}

err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Unpublish", req,
&structs.CSIVolumeUnpublishResponse{})

if tc.expectedErrMsg == "" {
require.NoError(t, err)
} else {
require.True(t, strings.Contains(err.Error(), tc.expectedErrMsg),
"error message %q did not contain %q", err.Error(), tc.expectedErrMsg)
}
})
}

}

func TestCSIVolumeEndpoint_List(t *testing.T) {
t.Parallel()
srv, shutdown := TestServer(t, func(c *Config) {
Expand Down
Loading

0 comments on commit acc1c0b

Please sign in to comment.