Skip to content

Commit

Permalink
csi: implement ControllerExpandVolume
Browse files Browse the repository at this point in the history
the first half of volume expansion,
this allows a user to update requested capacity
("capacity_min" and "capacity_max") in a volume
specification file, and re-issue either Register
or Create volume commands (or api calls).

the requested capacity will now be "reconciled"
with the current real capacity of the volume,
issuing a ControllerExpandVolume RPC call
to a running controller plugin, if requested
"capacity_min" is higher than the current
capacity on the volume in state.

csi spec:
https://github.com/container-storage-interface/spec/blob/c918b7f/spec.md#controllerexpandvolume

note: this does not yet cover NodeExpandVolume
  • Loading branch information
gulducat committed Sep 14, 2023
1 parent c28cd59 commit f71a248
Show file tree
Hide file tree
Showing 17 changed files with 1,055 additions and 89 deletions.
3 changes: 3 additions & 0 deletions .changelog/18359.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
csi: add ability to expand the size of volumes for plugins that support it
```
42 changes: 42 additions & 0 deletions client/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

metrics "github.com/armon/go-metrics"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"

"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/structs"
Expand Down Expand Up @@ -232,6 +233,47 @@ func (c *CSI) ControllerCreateVolume(req *structs.ClientCSIControllerCreateVolum
return nil
}

func (c *CSI) ControllerExpandVolume(req *structs.ClientCSIControllerExpandVolumeRequest, resp *structs.ClientCSIControllerExpandVolumeResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "expand_volume"}, time.Now())

plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
// the server's view of the plugin health is stale, so let it know it
// should retry with another controller instance
return fmt.Errorf("CSI.ControllerExpandVolume could not find plugin: %w: %v",
nstructs.ErrCSIClientRPCRetryable, err)
}
defer plugin.Close()

csiReq := req.ToCSIRequest()

ctx, cancelFn := c.requestContext()
defer cancelFn()

// CSI ControllerExpandVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
cresp, err := plugin.ControllerExpandVolume(ctx, csiReq,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
// if the volume was deleted out-of-band, we'll get an error from
// the plugin but can safely ignore it
c.c.logger.Debug("could not expand volume", "error", err)
return nil
}
if err != nil {
return fmt.Errorf("CSI.ControllerExpandVolume: %v", err)
}
if cresp == nil {
c.c.logger.Warn("plugin did not return error or response; this is a bug in the plugin and should be reported to the plugin author")
return fmt.Errorf("CSI.ControllerExpandVolume: plugin did not return error or response")
}
resp.CapacityBytes = cresp.CapacityBytes
resp.NodeExpansionRequired = cresp.NodeExpansionRequired
return nil
}

func (c *CSI) ControllerDeleteVolume(req *structs.ClientCSIControllerDeleteVolumeRequest, resp *structs.ClientCSIControllerDeleteVolumeResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "delete_volume"}, time.Now())

Expand Down
95 changes: 94 additions & 1 deletion client/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ package client

import (
"errors"
"fmt"
"testing"

"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/structs"
nstructs "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/csi"
"github.com/hashicorp/nomad/plugins/csi/fake"
"github.com/stretchr/testify/require"
)

var fakePlugin = &dynamicplugins.PluginInfo{
Expand Down Expand Up @@ -463,6 +467,95 @@ func TestCSIController_CreateVolume(t *testing.T) {
}
}

func TestCSIController_ExpandVolume(t *testing.T) {
cases := []struct {
Name string
ModRequest func(request *structs.ClientCSIControllerExpandVolumeRequest)
NextResp *csi.ControllerExpandVolumeResponse
NextErr error
ExpectErr string
}{
{
Name: "success",
NextResp: &csi.ControllerExpandVolumeResponse{
CapacityBytes: 99,
NodeExpansionRequired: true,
},
},
{
Name: "plugin not found",
ModRequest: func(r *structs.ClientCSIControllerExpandVolumeRequest) {
r.CSIControllerQuery.PluginID = "nonexistent"
},
ExpectErr: "CSI.ControllerExpandVolume could not find plugin: CSI client error (retryable): plugin nonexistent for type csi-controller not found",
},
{
Name: "ignorable error",
NextResp: &csi.ControllerExpandVolumeResponse{},
NextErr: fmt.Errorf("you can ignore me (%w)", nstructs.ErrCSIClientRPCIgnorable),
ExpectErr: "", // explicitly empty here for clarity.
},
{
Name: "controller error",
NextErr: errors.New("sad plugin"),
ExpectErr: "CSI.ControllerExpandVolume: sad plugin",
},
{
Name: "nil response from plugin",
NextResp: nil, // again explicit for clarity.
ExpectErr: "CSI.ControllerExpandVolume: plugin did not return error or response",
},
}

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
client, cleanup := TestClient(t, nil)
t.Cleanup(func() { test.NoError(t, cleanup()) })

fakeClient := &fake.Client{
NextControllerExpandVolumeResponse: tc.NextResp,
NextControllerExpandVolumeErr: tc.NextErr,
}

dispenserFunc := func(*dynamicplugins.PluginInfo) (interface{}, error) {
return fakeClient, nil
}
client.dynamicRegistry.StubDispenserForType(
dynamicplugins.PluginTypeCSIController, dispenserFunc)
err := client.dynamicRegistry.RegisterPlugin(fakePlugin)
must.NoError(t, err)

req := &structs.ClientCSIControllerExpandVolumeRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},

ExternalVolumeID: "some-volume-id",
CapacityRange: &csi.CapacityRange{
RequiredBytes: 99,
},
Secrets: map[string]string{"super": "secret"},
}
if tc.ModRequest != nil {
tc.ModRequest(req)
}

var resp structs.ClientCSIControllerExpandVolumeResponse
err = client.ClientRPC("CSI.ControllerExpandVolume", req, &resp)

if tc.ExpectErr != "" {
must.EqError(t, err, tc.ExpectErr)
return
}
must.NoError(t, err)
must.Eq(t, tc.NextResp.CapacityBytes, resp.CapacityBytes)
must.Eq(t, tc.NextResp.NodeExpansionRequired, resp.NodeExpansionRequired)

})
}

}

func TestCSIController_DeleteVolume(t *testing.T) {
ci.Parallel(t)

Expand Down
30 changes: 30 additions & 0 deletions client/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,36 @@ type ClientCSIControllerCreateVolumeResponse struct {
Topologies []*structs.CSITopology
}

// ClientCSIControllerExpandVolumeRequest is the RPC made from the server to a
// Nomad client to tell a CSI controller plugin on that client to perform
// ControllerExpandVolume
type ClientCSIControllerExpandVolumeRequest struct {
ExternalVolumeID string
CapacityRange *csi.CapacityRange
Secrets structs.CSISecrets
VolumeCapability *csi.VolumeCapability

CSIControllerQuery
}

func (req *ClientCSIControllerExpandVolumeRequest) ToCSIRequest() *csi.ControllerExpandVolumeRequest {
csiReq := &csi.ControllerExpandVolumeRequest{
ExternalVolumeID: req.ExternalVolumeID,
Capability: req.VolumeCapability,
Secrets: req.Secrets,
}
if req.CapacityRange != nil {
csiReq.RequiredBytes = req.CapacityRange.RequiredBytes
csiReq.LimitBytes = req.CapacityRange.LimitBytes
}
return csiReq
}

type ClientCSIControllerExpandVolumeResponse struct {
CapacityBytes int64
NodeExpansionRequired bool
}

// ClientCSIControllerDeleteVolumeRequest the RPC made from the server to a
// Nomad client to tell a CSI controller plugin on that client to perform
// DeleteVolume
Expand Down
15 changes: 15 additions & 0 deletions nomad/client_csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"

cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand Down Expand Up @@ -85,6 +86,20 @@ func (a *ClientCSI) ControllerCreateVolume(args *cstructs.ClientCSIControllerCre
return nil
}

func (a *ClientCSI) ControllerExpandVolume(args *cstructs.ClientCSIControllerExpandVolumeRequest, reply *cstructs.ClientCSIControllerExpandVolumeResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "expand_volume"}, time.Now())

err := a.sendCSIControllerRPC(args.PluginID,
"CSI.ControllerExpandVolume",
"ClientCSI.ControllerExpandVolume",
structs.RateMetricWrite,
args, reply)
if err != nil {
return fmt.Errorf("controller expand volume: %v", err)
}
return nil
}

func (a *ClientCSI) ControllerDeleteVolume(args *cstructs.ClientCSIControllerDeleteVolumeRequest, reply *cstructs.ClientCSIControllerDeleteVolumeResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "delete_volume"}, time.Now())

Expand Down
48 changes: 28 additions & 20 deletions nomad/client_csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,33 @@ import (
// responses that have no bodies have no "Next*Response" field and will always
// return an empty response body.
type MockClientCSI struct {
NextValidateError error
NextAttachError error
NextAttachResponse *cstructs.ClientCSIControllerAttachVolumeResponse
NextDetachError error
NextCreateError error
NextCreateResponse *cstructs.ClientCSIControllerCreateVolumeResponse
NextDeleteError error
NextListExternalError error
NextListExternalResponse *cstructs.ClientCSIControllerListVolumesResponse
NextCreateSnapshotError error
NextCreateSnapshotResponse *cstructs.ClientCSIControllerCreateSnapshotResponse
NextDeleteSnapshotError error
NextListExternalSnapshotsError error
NextListExternalSnapshotsResponse *cstructs.ClientCSIControllerListSnapshotsResponse
NextNodeDetachError error
NextValidateError error
NextAttachError error
NextAttachResponse *cstructs.ClientCSIControllerAttachVolumeResponse
NextDetachError error
NextCreateError error
NextCreateResponse *cstructs.ClientCSIControllerCreateVolumeResponse
NextDeleteError error
NextListExternalError error
NextListExternalResponse *cstructs.ClientCSIControllerListVolumesResponse
NextCreateSnapshotError error
NextCreateSnapshotResponse *cstructs.ClientCSIControllerCreateSnapshotResponse
NextDeleteSnapshotError error
NextListExternalSnapshotsError error
NextListExternalSnapshotsResponse *cstructs.ClientCSIControllerListSnapshotsResponse
NextControllerExpandVolumeError error
NextControllerExpandVolumeResponse *cstructs.ClientCSIControllerExpandVolumeResponse
NextNodeDetachError error
}

func newMockClientCSI() *MockClientCSI {
return &MockClientCSI{
NextAttachResponse: &cstructs.ClientCSIControllerAttachVolumeResponse{},
NextCreateResponse: &cstructs.ClientCSIControllerCreateVolumeResponse{},
NextListExternalResponse: &cstructs.ClientCSIControllerListVolumesResponse{},
NextCreateSnapshotResponse: &cstructs.ClientCSIControllerCreateSnapshotResponse{},
NextListExternalSnapshotsResponse: &cstructs.ClientCSIControllerListSnapshotsResponse{},
NextAttachResponse: &cstructs.ClientCSIControllerAttachVolumeResponse{},
NextCreateResponse: &cstructs.ClientCSIControllerCreateVolumeResponse{},
NextListExternalResponse: &cstructs.ClientCSIControllerListVolumesResponse{},
NextCreateSnapshotResponse: &cstructs.ClientCSIControllerCreateSnapshotResponse{},
NextListExternalSnapshotsResponse: &cstructs.ClientCSIControllerListSnapshotsResponse{},
NextControllerExpandVolumeResponse: &cstructs.ClientCSIControllerExpandVolumeResponse{},
}
}

Expand Down Expand Up @@ -96,6 +99,11 @@ func (c *MockClientCSI) ControllerListSnapshots(req *cstructs.ClientCSIControlle
return c.NextListExternalSnapshotsError
}

func (c *MockClientCSI) ControllerExpandVolume(req *cstructs.ClientCSIControllerExpandVolumeRequest, resp *cstructs.ClientCSIControllerExpandVolumeResponse) error {
*resp = *c.NextControllerExpandVolumeResponse
return c.NextControllerExpandVolumeError
}

func (c *MockClientCSI) NodeDetachVolume(req *cstructs.ClientCSINodeDetachVolumeRequest, resp *cstructs.ClientCSINodeDetachVolumeResponse) error {
return c.NextNodeDetachError
}
Expand Down
Loading

0 comments on commit f71a248

Please sign in to comment.