Skip to content

Commit

Permalink
csi: implement controller detach RPCs (#7356)
Browse files Browse the repository at this point in the history
This changeset implements the remaining controller detach RPCs: server-to-client and client-to-controller. The tests also uncovered a bug in our RPC for claims which is fixed here; the volume claim RPC is used for both claiming and releasing a claim on a volume. We should only submit a controller publish RPC when the claim is new and not when it's being released.
  • Loading branch information
tgross committed Mar 23, 2020
1 parent ff34f5b commit 46f1702
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 11 deletions.
37 changes: 34 additions & 3 deletions client/csi_controller_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package client
import (
"context"
"errors"
"fmt"
"time"

metrics "github.com/armon/go-metrics"
Expand Down Expand Up @@ -66,7 +65,7 @@ func (c *CSIController) ValidateVolume(req *structs.ClientCSIControllerValidateV
// 1. Validate the volume request
// 2. Call ControllerPublishVolume on the CSI Plugin to trigger a remote attachment
//
// In the future this may be expanded to request dynamic secrets for attachement.
// In the future this may be expanded to request dynamic secrets for attachment.
func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolumeRequest, resp *structs.ClientCSIControllerAttachVolumeResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "publish_volume"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
Expand Down Expand Up @@ -105,8 +104,40 @@ func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolum
return nil
}

// DetachVolume is used to detach a volume from a CSI Cluster from
// the storage node provided in the request.
func (c *CSIController) DetachVolume(req *structs.ClientCSIControllerDetachVolumeRequest, resp *structs.ClientCSIControllerDetachVolumeResponse) error {
return fmt.Errorf("Unimplemented")
defer metrics.MeasureSince([]string{"client", "csi_controller", "unpublish_volume"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
return err
}
defer plugin.Close()

// The following block of validation checks should not be reached on a
// real Nomad cluster as all of this data should be validated when registering
// volumes with the cluster. They serve as a defensive check before forwarding
// requests to plugins, and to aid with development.

if req.VolumeID == "" {
return errors.New("VolumeID is required")
}

if req.ClientCSINodeID == "" {
return errors.New("ClientCSINodeID is required")
}

csiReq := req.ToCSIRequest()

// Submit the request for a volume to the CSI Plugin.
ctx, cancelFn := c.requestContext()
defer cancelFn()
_, err = plugin.ControllerUnpublishVolume(ctx, csiReq)
if err != nil {
return err
}

return nil
}

func (c *CSIController) findControllerPlugin(name string) (csi.CSIPlugin, error) {
Expand Down
83 changes: 83 additions & 0 deletions client/csi_controller_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,86 @@ func TestCSIController_ValidateVolume(t *testing.T) {
})
}
}

func TestCSIController_DetachVolume(t *testing.T) {
t.Parallel()

cases := []struct {
Name string
ClientSetupFunc func(*fake.Client)
Request *structs.ClientCSIControllerDetachVolumeRequest
ExpectedErr error
ExpectedResponse *structs.ClientCSIControllerDetachVolumeResponse
}{
{
Name: "returns plugin not found errors",
Request: &structs.ClientCSIControllerDetachVolumeRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: "some-garbage",
},
},
ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"),
},
{
Name: "validates volumeid is not empty",
Request: &structs.ClientCSIControllerDetachVolumeRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
},
ExpectedErr: errors.New("VolumeID is required"),
},
{
Name: "validates nodeid is not empty",
Request: &structs.ClientCSIControllerDetachVolumeRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
VolumeID: "1234-4321-1234-4321",
},
ExpectedErr: errors.New("ClientCSINodeID is required"),
},
{
Name: "returns transitive errors",
ClientSetupFunc: func(fc *fake.Client) {
fc.NextControllerUnpublishVolumeErr = errors.New("hello")
},
Request: &structs.ClientCSIControllerDetachVolumeRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
VolumeID: "1234-4321-1234-4321",
ClientCSINodeID: "abcde",
},
ExpectedErr: errors.New("hello"),
},
}

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
require := require.New(t)
client, cleanup := TestClient(t, nil)
defer cleanup()

fakeClient := &fake.Client{}
if tc.ClientSetupFunc != nil {
tc.ClientSetupFunc(fakeClient)
}

dispenserFunc := func(*dynamicplugins.PluginInfo) (interface{}, error) {
return fakeClient, nil
}
client.dynamicRegistry.StubDispenserForType(dynamicplugins.PluginTypeCSIController, dispenserFunc)

err := client.dynamicRegistry.RegisterPlugin(fakePlugin)
require.Nil(err)

var resp structs.ClientCSIControllerDetachVolumeResponse
err = client.ClientRPC("CSIController.DetachVolume", tc.Request, &resp)
require.Equal(tc.ExpectedErr, err)
if tc.ExpectedResponse != nil {
require.Equal(tc.ExpectedResponse, &resp)
}
})
}
}
34 changes: 34 additions & 0 deletions nomad/client_csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,37 @@ func (a *ClientCSIController) ValidateVolume(args *cstructs.ClientCSIControllerV
}
return nil
}

func (a *ClientCSIController) DetachVolume(args *cstructs.ClientCSIControllerDetachVolumeRequest, reply *cstructs.ClientCSIControllerDetachVolumeResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "detach_volume"}, time.Now())

// Verify the arguments.
if args.ControllerNodeID == "" {
return errors.New("missing ControllerNodeID")
}

// Make sure Node is valid and new enough to support RPC
snap, err := a.srv.State().Snapshot()
if err != nil {
return err
}

_, err = getNodeForRpc(snap, args.ControllerNodeID)
if err != nil {
return err
}

// Get the connection to the client
state, ok := a.srv.getNodeConn(args.ControllerNodeID)
if !ok {
return findNodeConnAndForward(a.srv, args.ControllerNodeID, "ClientCSIController.DetachVolume", args, reply)
}

// Make the RPC
err = NodeRpc(state.Session, "CSIController.DetachVolume", args, reply)
if err != nil {
return fmt.Errorf("detach volume: %v", err)
}
return nil

}
78 changes: 78 additions & 0 deletions nomad/client_csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,81 @@ func TestClientCSIController_AttachVolume_Forwarded(t *testing.T) {
// Should recieve an error from the client endpoint
require.Contains(err.Error(), "must specify plugin name to dispense")
}

func TestClientCSIController_DetachVolume_Local(t *testing.T) {
t.Parallel()
require := require.New(t)

// Start a server and client
s, cleanupS := TestServer(t, nil)
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)

c, cleanupC := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
})
defer cleanupC()

testutil.WaitForResult(func() (bool, error) {
nodes := s.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
require.Fail("should have a client")
})

req := &cstructs.ClientCSIControllerDetachVolumeRequest{
CSIControllerQuery: cstructs.CSIControllerQuery{ControllerNodeID: c.NodeID()},
}

// Fetch the response
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSIController.DetachVolume", req, &resp)
require.NotNil(err)
// Should recieve an error from the client endpoint
require.Contains(err.Error(), "must specify plugin name to dispense")
}

func TestClientCSIController_DetachVolume_Forwarded(t *testing.T) {
t.Parallel()
require := require.New(t)

// Start a server and client
s1, cleanupS1 := TestServer(t, func(c *Config) { c.BootstrapExpect = 2 })
defer cleanupS1()
s2, cleanupS2 := TestServer(t, func(c *Config) { c.BootstrapExpect = 2 })
defer cleanupS2()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
codec := rpcClient(t, s2)

c, cleanupC := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
c.GCDiskUsageThreshold = 100.0
})
defer cleanupC()

testutil.WaitForResult(func() (bool, error) {
nodes := s2.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
require.Fail("should have a client")
})

// Force remove the connection locally in case it exists
s1.nodeConnsLock.Lock()
delete(s1.nodeConns, c.NodeID())
s1.nodeConnsLock.Unlock()

req := &cstructs.ClientCSIControllerDetachVolumeRequest{
CSIControllerQuery: cstructs.CSIControllerQuery{ControllerNodeID: c.NodeID()},
}

// Fetch the response
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSIController.DetachVolume", req, &resp)
require.NotNil(err)
// Should recieve an error from the client endpoint
require.Contains(err.Error(), "must specify plugin name to dispense")
}
Loading

0 comments on commit 46f1702

Please sign in to comment.