From f71a248d5c726cce972de3980fb0040bdede5a6f Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Wed, 19 Jul 2023 15:37:44 -0500 Subject: [PATCH] csi: implement ControllerExpandVolume the first half of volume expansion, this allows a user to update requested capacity ("capacity_min" and "capacity_max") in a volume specification file, and re-issue either Register or Create volume commands (or api calls). the requested capacity will now be "reconciled" with the current real capacity of the volume, issuing a ControllerExpandVolume RPC call to a running controller plugin, if requested "capacity_min" is higher than the current capacity on the volume in state. csi spec: https://github.com/container-storage-interface/spec/blob/c918b7f/spec.md#controllerexpandvolume note: this does not yet cover NodeExpandVolume --- .changelog/18359.txt | 3 + client/csi_endpoint.go | 42 +++++ client/csi_endpoint_test.go | 95 +++++++++- client/structs/csi.go | 30 ++++ nomad/client_csi_endpoint.go | 15 ++ nomad/client_csi_endpoint_test.go | 48 ++--- nomad/csi_endpoint.go | 202 ++++++++++++++++++--- nomad/csi_endpoint_test.go | 290 ++++++++++++++++++++++++++++++ nomad/state/state_store.go | 14 +- nomad/state/state_store_test.go | 6 +- nomad/structs/csi.go | 42 +++-- nomad/structs/csi_test.go | 11 -- plugins/csi/client.go | 53 +++++- plugins/csi/client_test.go | 160 ++++++++++++++++- plugins/csi/fake/client.go | 32 +++- plugins/csi/plugin.go | 86 ++++++++- plugins/csi/testing/client.go | 15 ++ 17 files changed, 1055 insertions(+), 89 deletions(-) create mode 100644 .changelog/18359.txt diff --git a/.changelog/18359.txt b/.changelog/18359.txt new file mode 100644 index 00000000000..04d582232f6 --- /dev/null +++ b/.changelog/18359.txt @@ -0,0 +1,3 @@ +```release-note:improvement +csi: add ability to expand the size of volumes for plugins that support it +``` diff --git a/client/csi_endpoint.go b/client/csi_endpoint.go index f8b34d487cb..933ad96d695 100644 --- a/client/csi_endpoint.go +++ b/client/csi_endpoint.go @@ -11,6 +11,7 @@ import ( metrics "github.com/armon/go-metrics" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" + "github.com/hashicorp/nomad/client/dynamicplugins" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/client/structs" @@ -232,6 +233,47 @@ func (c *CSI) ControllerCreateVolume(req *structs.ClientCSIControllerCreateVolum return nil } +func (c *CSI) ControllerExpandVolume(req *structs.ClientCSIControllerExpandVolumeRequest, resp *structs.ClientCSIControllerExpandVolumeResponse) error { + defer metrics.MeasureSince([]string{"client", "csi_controller", "expand_volume"}, time.Now()) + + plugin, err := c.findControllerPlugin(req.PluginID) + if err != nil { + // the server's view of the plugin health is stale, so let it know it + // should retry with another controller instance + return fmt.Errorf("CSI.ControllerExpandVolume could not find plugin: %w: %v", + nstructs.ErrCSIClientRPCRetryable, err) + } + defer plugin.Close() + + csiReq := req.ToCSIRequest() + + ctx, cancelFn := c.requestContext() + defer cancelFn() + + // CSI ControllerExpandVolume errors for timeout, codes.Unavailable and + // codes.ResourceExhausted are retried; all other errors are fatal. + cresp, err := plugin.ControllerExpandVolume(ctx, csiReq, + grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout), + grpc_retry.WithMax(3), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond))) + if errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) { + // if the volume was deleted out-of-band, we'll get an error from + // the plugin but can safely ignore it + c.c.logger.Debug("could not expand volume", "error", err) + return nil + } + if err != nil { + return fmt.Errorf("CSI.ControllerExpandVolume: %v", err) + } + if cresp == nil { + c.c.logger.Warn("plugin did not return error or response; this is a bug in the plugin and should be reported to the plugin author") + return fmt.Errorf("CSI.ControllerExpandVolume: plugin did not return error or response") + } + resp.CapacityBytes = cresp.CapacityBytes + resp.NodeExpansionRequired = cresp.NodeExpansionRequired + return nil +} + func (c *CSI) ControllerDeleteVolume(req *structs.ClientCSIControllerDeleteVolumeRequest, resp *structs.ClientCSIControllerDeleteVolumeResponse) error { defer metrics.MeasureSince([]string{"client", "csi_controller", "delete_volume"}, time.Now()) diff --git a/client/csi_endpoint_test.go b/client/csi_endpoint_test.go index 76b41cbca7f..a83b975ba45 100644 --- a/client/csi_endpoint_test.go +++ b/client/csi_endpoint_test.go @@ -5,15 +5,19 @@ package client import ( "errors" + "fmt" "testing" + "github.com/shoenig/test" + "github.com/shoenig/test/must" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/dynamicplugins" "github.com/hashicorp/nomad/client/structs" nstructs "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/csi" "github.com/hashicorp/nomad/plugins/csi/fake" - "github.com/stretchr/testify/require" ) var fakePlugin = &dynamicplugins.PluginInfo{ @@ -463,6 +467,95 @@ func TestCSIController_CreateVolume(t *testing.T) { } } +func TestCSIController_ExpandVolume(t *testing.T) { + cases := []struct { + Name string + ModRequest func(request *structs.ClientCSIControllerExpandVolumeRequest) + NextResp *csi.ControllerExpandVolumeResponse + NextErr error + ExpectErr string + }{ + { + Name: "success", + NextResp: &csi.ControllerExpandVolumeResponse{ + CapacityBytes: 99, + NodeExpansionRequired: true, + }, + }, + { + Name: "plugin not found", + ModRequest: func(r *structs.ClientCSIControllerExpandVolumeRequest) { + r.CSIControllerQuery.PluginID = "nonexistent" + }, + ExpectErr: "CSI.ControllerExpandVolume could not find plugin: CSI client error (retryable): plugin nonexistent for type csi-controller not found", + }, + { + Name: "ignorable error", + NextResp: &csi.ControllerExpandVolumeResponse{}, + NextErr: fmt.Errorf("you can ignore me (%w)", nstructs.ErrCSIClientRPCIgnorable), + ExpectErr: "", // explicitly empty here for clarity. + }, + { + Name: "controller error", + NextErr: errors.New("sad plugin"), + ExpectErr: "CSI.ControllerExpandVolume: sad plugin", + }, + { + Name: "nil response from plugin", + NextResp: nil, // again explicit for clarity. + ExpectErr: "CSI.ControllerExpandVolume: plugin did not return error or response", + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + client, cleanup := TestClient(t, nil) + t.Cleanup(func() { test.NoError(t, cleanup()) }) + + fakeClient := &fake.Client{ + NextControllerExpandVolumeResponse: tc.NextResp, + NextControllerExpandVolumeErr: tc.NextErr, + } + + dispenserFunc := func(*dynamicplugins.PluginInfo) (interface{}, error) { + return fakeClient, nil + } + client.dynamicRegistry.StubDispenserForType( + dynamicplugins.PluginTypeCSIController, dispenserFunc) + err := client.dynamicRegistry.RegisterPlugin(fakePlugin) + must.NoError(t, err) + + req := &structs.ClientCSIControllerExpandVolumeRequest{ + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: fakePlugin.Name, + }, + + ExternalVolumeID: "some-volume-id", + CapacityRange: &csi.CapacityRange{ + RequiredBytes: 99, + }, + Secrets: map[string]string{"super": "secret"}, + } + if tc.ModRequest != nil { + tc.ModRequest(req) + } + + var resp structs.ClientCSIControllerExpandVolumeResponse + err = client.ClientRPC("CSI.ControllerExpandVolume", req, &resp) + + if tc.ExpectErr != "" { + must.EqError(t, err, tc.ExpectErr) + return + } + must.NoError(t, err) + must.Eq(t, tc.NextResp.CapacityBytes, resp.CapacityBytes) + must.Eq(t, tc.NextResp.NodeExpansionRequired, resp.NodeExpansionRequired) + + }) + } + +} + func TestCSIController_DeleteVolume(t *testing.T) { ci.Parallel(t) diff --git a/client/structs/csi.go b/client/structs/csi.go index 1b8746112c4..131071196df 100644 --- a/client/structs/csi.go +++ b/client/structs/csi.go @@ -287,6 +287,36 @@ type ClientCSIControllerCreateVolumeResponse struct { Topologies []*structs.CSITopology } +// ClientCSIControllerExpandVolumeRequest is the RPC made from the server to a +// Nomad client to tell a CSI controller plugin on that client to perform +// ControllerExpandVolume +type ClientCSIControllerExpandVolumeRequest struct { + ExternalVolumeID string + CapacityRange *csi.CapacityRange + Secrets structs.CSISecrets + VolumeCapability *csi.VolumeCapability + + CSIControllerQuery +} + +func (req *ClientCSIControllerExpandVolumeRequest) ToCSIRequest() *csi.ControllerExpandVolumeRequest { + csiReq := &csi.ControllerExpandVolumeRequest{ + ExternalVolumeID: req.ExternalVolumeID, + Capability: req.VolumeCapability, + Secrets: req.Secrets, + } + if req.CapacityRange != nil { + csiReq.RequiredBytes = req.CapacityRange.RequiredBytes + csiReq.LimitBytes = req.CapacityRange.LimitBytes + } + return csiReq +} + +type ClientCSIControllerExpandVolumeResponse struct { + CapacityBytes int64 + NodeExpansionRequired bool +} + // ClientCSIControllerDeleteVolumeRequest the RPC made from the server to a // Nomad client to tell a CSI controller plugin on that client to perform // DeleteVolume diff --git a/nomad/client_csi_endpoint.go b/nomad/client_csi_endpoint.go index 197e17eeb49..f1c531ce09b 100644 --- a/nomad/client_csi_endpoint.go +++ b/nomad/client_csi_endpoint.go @@ -13,6 +13,7 @@ import ( metrics "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" ) @@ -85,6 +86,20 @@ func (a *ClientCSI) ControllerCreateVolume(args *cstructs.ClientCSIControllerCre return nil } +func (a *ClientCSI) ControllerExpandVolume(args *cstructs.ClientCSIControllerExpandVolumeRequest, reply *cstructs.ClientCSIControllerExpandVolumeResponse) error { + defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "expand_volume"}, time.Now()) + + err := a.sendCSIControllerRPC(args.PluginID, + "CSI.ControllerExpandVolume", + "ClientCSI.ControllerExpandVolume", + structs.RateMetricWrite, + args, reply) + if err != nil { + return fmt.Errorf("controller expand volume: %v", err) + } + return nil +} + func (a *ClientCSI) ControllerDeleteVolume(args *cstructs.ClientCSIControllerDeleteVolumeRequest, reply *cstructs.ClientCSIControllerDeleteVolumeResponse) error { defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "delete_volume"}, time.Now()) diff --git a/nomad/client_csi_endpoint_test.go b/nomad/client_csi_endpoint_test.go index 1cb0ac3bfc1..74c4adb0509 100644 --- a/nomad/client_csi_endpoint_test.go +++ b/nomad/client_csi_endpoint_test.go @@ -28,30 +28,33 @@ import ( // responses that have no bodies have no "Next*Response" field and will always // return an empty response body. type MockClientCSI struct { - NextValidateError error - NextAttachError error - NextAttachResponse *cstructs.ClientCSIControllerAttachVolumeResponse - NextDetachError error - NextCreateError error - NextCreateResponse *cstructs.ClientCSIControllerCreateVolumeResponse - NextDeleteError error - NextListExternalError error - NextListExternalResponse *cstructs.ClientCSIControllerListVolumesResponse - NextCreateSnapshotError error - NextCreateSnapshotResponse *cstructs.ClientCSIControllerCreateSnapshotResponse - NextDeleteSnapshotError error - NextListExternalSnapshotsError error - NextListExternalSnapshotsResponse *cstructs.ClientCSIControllerListSnapshotsResponse - NextNodeDetachError error + NextValidateError error + NextAttachError error + NextAttachResponse *cstructs.ClientCSIControllerAttachVolumeResponse + NextDetachError error + NextCreateError error + NextCreateResponse *cstructs.ClientCSIControllerCreateVolumeResponse + NextDeleteError error + NextListExternalError error + NextListExternalResponse *cstructs.ClientCSIControllerListVolumesResponse + NextCreateSnapshotError error + NextCreateSnapshotResponse *cstructs.ClientCSIControllerCreateSnapshotResponse + NextDeleteSnapshotError error + NextListExternalSnapshotsError error + NextListExternalSnapshotsResponse *cstructs.ClientCSIControllerListSnapshotsResponse + NextControllerExpandVolumeError error + NextControllerExpandVolumeResponse *cstructs.ClientCSIControllerExpandVolumeResponse + NextNodeDetachError error } func newMockClientCSI() *MockClientCSI { return &MockClientCSI{ - NextAttachResponse: &cstructs.ClientCSIControllerAttachVolumeResponse{}, - NextCreateResponse: &cstructs.ClientCSIControllerCreateVolumeResponse{}, - NextListExternalResponse: &cstructs.ClientCSIControllerListVolumesResponse{}, - NextCreateSnapshotResponse: &cstructs.ClientCSIControllerCreateSnapshotResponse{}, - NextListExternalSnapshotsResponse: &cstructs.ClientCSIControllerListSnapshotsResponse{}, + NextAttachResponse: &cstructs.ClientCSIControllerAttachVolumeResponse{}, + NextCreateResponse: &cstructs.ClientCSIControllerCreateVolumeResponse{}, + NextListExternalResponse: &cstructs.ClientCSIControllerListVolumesResponse{}, + NextCreateSnapshotResponse: &cstructs.ClientCSIControllerCreateSnapshotResponse{}, + NextListExternalSnapshotsResponse: &cstructs.ClientCSIControllerListSnapshotsResponse{}, + NextControllerExpandVolumeResponse: &cstructs.ClientCSIControllerExpandVolumeResponse{}, } } @@ -96,6 +99,11 @@ func (c *MockClientCSI) ControllerListSnapshots(req *cstructs.ClientCSIControlle return c.NextListExternalSnapshotsError } +func (c *MockClientCSI) ControllerExpandVolume(req *cstructs.ClientCSIControllerExpandVolumeRequest, resp *cstructs.ClientCSIControllerExpandVolumeResponse) error { + *resp = *c.NextControllerExpandVolumeResponse + return c.NextControllerExpandVolumeError +} + func (c *MockClientCSI) NodeDetachVolume(req *cstructs.ClientCSINodeDetachVolumeRequest, resp *cstructs.ClientCSINodeDetachVolumeResponse) error { return c.NextNodeDetachError } diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 7dd217d428a..70598206490 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -5,12 +5,14 @@ package nomad import ( "context" + "errors" "fmt" "net/http" "strings" "time" "github.com/armon/go-metrics" + "github.com/dustin/go-humanize" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" @@ -20,6 +22,7 @@ import ( "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state/paginator" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/csi" ) // CSIVolume wraps the structs.CSIVolume with request data and server context @@ -228,7 +231,7 @@ func (v *CSIVolume) Get(args *structs.CSIVolumeGetRequest, reply *structs.CSIVol return v.srv.blockingRPC(&opts) } -func (v *CSIVolume) pluginValidateVolume(req *structs.CSIVolumeRegisterRequest, vol *structs.CSIVolume) (*structs.CSIPlugin, error) { +func (v *CSIVolume) pluginValidateVolume(vol *structs.CSIVolume) (*structs.CSIPlugin, error) { state := v.srv.fsm.State() plugin, err := state.CSIPluginByID(nil, vol.PluginID) @@ -239,6 +242,10 @@ func (v *CSIVolume) pluginValidateVolume(req *structs.CSIVolumeRegisterRequest, return nil, fmt.Errorf("no CSI plugin named: %s could be found", vol.PluginID) } + if plugin.ControllerRequired && plugin.ControllersHealthy < 1 { + return nil, fmt.Errorf("no healthy controllers for CSI plugin: %s", vol.PluginID) + } + vol.Provider = plugin.Provider vol.ProviderVersion = plugin.Version @@ -267,9 +274,11 @@ func (v *CSIVolume) controllerValidateVolume(req *structs.CSIVolumeRegisterReque return v.srv.RPC(method, cReq, cResp) } -// Register registers a new volume or updates an existing volume. Note -// that most user-defined CSIVolume fields are immutable once the -// volume has been created. +// Register registers a new volume or updates an existing volume. +// +// Note that most user-defined CSIVolume fields are immutable once +// the volume has been created, but exceptions include min and max +// requested capacity values. // // If the user needs to change fields because they've misconfigured // the registration of the external volume, we expect that claims @@ -325,6 +334,11 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru return err } + plugin, err := v.pluginValidateVolume(vol) + if err != nil { + return err + } + // CSIVolume has many user-defined fields which are immutable // once set, and many fields that are controlled by Nomad and // are not user-settable. We merge onto a copy of the existing @@ -335,11 +349,14 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru // Terraform). if existingVol != nil { existingVol = existingVol.Copy() - err = existingVol.Merge(vol) - if err != nil { - return err + + // reconcile mutable fields + if err = v.reconcileVolume(plugin, existingVol, vol); err != nil { + return fmt.Errorf("unable to update volume: %s", err) } + *vol = *existingVol + } else if vol.Topologies == nil || len(vol.Topologies) == 0 { // The topologies for the volume have already been set // when it was created, so for newly register volumes @@ -349,10 +366,6 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru } } - plugin, err := v.pluginValidateVolume(args, vol) - if err != nil { - return err - } if err := v.controllerValidateVolume(args, vol, plugin); err != nil { return err } @@ -369,6 +382,24 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru return nil } +// reconcileVolume updates a volume with many of the contents of another. +// It may or may not do extra work to actually expand a volume outside of Nomad, +// depending on whether requested capacity values have changed. +func (v *CSIVolume) reconcileVolume(plugin *structs.CSIPlugin, vol *structs.CSIVolume, update *structs.CSIVolume) error { + // Merge does some validation, before we attempt any potential CSI RPCs, + // and mutates `vol` with (most of) the values of `update`, + // notably excluding capacity values, which are covered below. + err := vol.Merge(update) + if err != nil { + return err + } + // expandVolume will mutate `vol` with new capacity-related values, if needed. + return v.expandVolume(vol, plugin, &csi.CapacityRange{ + RequiredBytes: update.RequestedCapacityMin, + LimitBytes: update.RequestedCapacityMax, + }) +} + // Deregister removes a set of volumes func (v *CSIVolume) Deregister(args *structs.CSIVolumeDeregisterRequest, reply *structs.CSIVolumeDeregisterResponse) error { @@ -1023,6 +1054,8 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs. type validated struct { vol *structs.CSIVolume plugin *structs.CSIPlugin + // if the volume already exists, we'll update it instead of creating. + current *structs.CSIVolume } validatedVols := []validated{} @@ -1036,7 +1069,7 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs. if err = vol.Validate(); err != nil { return err } - plugin, err := v.pluginValidateVolume(regArgs, vol) + plugin, err := v.pluginValidateVolume(vol) if err != nil { return err } @@ -1047,7 +1080,19 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs. return fmt.Errorf("plugin does not support creating volumes") } - validatedVols = append(validatedVols, validated{vol, plugin}) + // if the volume already exists, we'll update it instead + snap, err := v.srv.State().Snapshot() + if err != nil { + return err + } + // current will be nil if it does not exist. + current, err := snap.CSIVolumeByID(nil, vol.Namespace, vol.ID) + if err != nil { + return err + } + + validatedVols = append(validatedVols, + validated{vol, plugin, current}) } // Attempt to create all the validated volumes and write only successfully @@ -1062,20 +1107,37 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs. // eval" that can do the plugin RPCs async. var mErr multierror.Error + var index uint64 for _, valid := range validatedVols { - err = v.createVolume(valid.vol, valid.plugin) - if err != nil { - multierror.Append(&mErr, err) + if valid.current != nil { + // reconcile mutable fields + cp := valid.current.Copy() + err = v.reconcileVolume(valid.plugin, cp, valid.vol) + if err != nil { + mErr.Errors = append(mErr.Errors, err) + } else { + // we merged valid.vol into cp, so update state with the copy + regArgs.Volumes = append(regArgs.Volumes, cp) + } + } else { - regArgs.Volumes = append(regArgs.Volumes, valid.vol) + err = v.createVolume(valid.vol, valid.plugin) + if err != nil { + mErr.Errors = append(mErr.Errors, err) + } else { + regArgs.Volumes = append(regArgs.Volumes, valid.vol) + } } } - _, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, regArgs) - if err != nil { - v.logger.Error("csi raft apply failed", "error", err, "method", "register") - multierror.Append(&mErr, err) + // If we created or updated volumes, apply them to raft. + if len(regArgs.Volumes) > 0 { + _, index, err = v.srv.raftApply(structs.CSIVolumeRegisterRequestType, regArgs) + if err != nil { + v.logger.Error("csi raft apply failed", "error", err, "method", "register") + mErr.Errors = append(mErr.Errors, err) + } } err = mErr.ErrorOrNil() @@ -1118,6 +1180,104 @@ func (v *CSIVolume) createVolume(vol *structs.CSIVolume, plugin *structs.CSIPlug return nil } +// expandVolume validates the requested capacity values and issues +// ControllerExpandVolume (and NodeExpandVolume, if needed) to the CSI plugin, +// via Nomad client RPC. +// +// Note that capacity can only be increased; reduction in size is not possible, +// and if the volume is already at the desired capacity, no action is taken. +// vol Capacity-related values are mutated if successful, so callers should +// pass in a copy, then commit changes to raft. +func (v *CSIVolume) expandVolume(vol *structs.CSIVolume, plugin *structs.CSIPlugin, capacity *csi.CapacityRange) error { + if vol == nil || plugin == nil || capacity == nil { + return errors.New("unexpected nil value") + } + + newMax := capacity.LimitBytes + newMin := capacity.RequiredBytes + logger := v.logger.Named("expandVolume").With( + "vol", vol.ID, + "requested_min", humanize.Bytes(uint64(newMin)), + "requested_max", humanize.Bytes(uint64(newMax)), + ) + + // If requested capacity values are unset, skip everything. + if newMax == 0 && newMin == 0 { + logger.Debug("min and max values are zero") + return nil + } + + // New values same as current, so nothing to do. + if vol.RequestedCapacityMax == newMax && + vol.RequestedCapacityMin == newMin { + logger.Debug("requested capacity unchanged") + return nil + } + + // If max is specified, it cannot be less than min or current capacity. + if newMax > 0 { + if newMax < newMin { + return fmt.Errorf("max requested capacity (%s) less than or equal to min (%s)", + humanize.Bytes(uint64(newMax)), + humanize.Bytes(uint64(newMin))) + } + if newMax < vol.Capacity { + return fmt.Errorf("max requested capacity (%s) less than or equal to current (%s)", + humanize.Bytes(uint64(newMax)), + humanize.Bytes(uint64(vol.Capacity))) + } + } + + // Values are validated, so go ahead and update vol to commit to state, + // even if the external volume does not need expanding. + vol.RequestedCapacityMin = newMin + vol.RequestedCapacityMax = newMax + + // Only expand if new min is greater than current capacity. + if newMin <= vol.Capacity { + return nil + } + + if !plugin.HasControllerCapability(structs.CSIControllerSupportsExpand) { + return errors.New("expand is not implemented by this controller plugin") + } + + capability, err := csi.VolumeCapabilityFromStructs(vol.AttachmentMode, vol.AccessMode, vol.MountOptions) + if err != nil { + logger.Debug("unable to get capability from volume", "error", err) + // We'll optimistically send a nil capability, as an "unknown" + // attachment mode (likely not attached) is acceptable per the spec. + } + + method := "ClientCSI.ControllerExpandVolume" + cReq := &cstructs.ClientCSIControllerExpandVolumeRequest{ + ExternalVolumeID: vol.ExternalID, + Secrets: vol.Secrets, + CapacityRange: capacity, + VolumeCapability: capability, + } + cReq.PluginID = plugin.ID + cResp := &cstructs.ClientCSIControllerExpandVolumeResponse{} + + logger.Info("starting volume expansion") + // This is the real work. The client RPC sends a gRPC to the controller plugin, + // then that controller may reach out to cloud APIs, etc. + err = v.serializedControllerRPC(plugin.ID, func() error { + return v.srv.RPC(method, cReq, cResp) + }) + if err != nil { + return fmt.Errorf("unable to expand volume: %w", err) + } + vol.Capacity = cResp.CapacityBytes + logger.Info("controller done expanding volume") + + if cResp.NodeExpansionRequired { + v.logger.Warn("TODO: also do node volume expansion if needed") // TODO + } + + return nil +} + func (v *CSIVolume) Delete(args *structs.CSIVolumeDeleteRequest, reply *structs.CSIVolumeDeleteResponse) error { authErr := v.srv.Authenticate(v.ctx, args) diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index aa548c1f423..dce281428b3 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -26,6 +26,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/csi" "github.com/hashicorp/nomad/testutil" ) @@ -126,6 +127,83 @@ func TestCSIVolumeEndpoint_Get_ACL(t *testing.T) { require.Equal(t, vols[0].ID, resp.Volume.ID) } +func TestCSIVolume_pluginValidateVolume(t *testing.T) { + // bare minimum server for this method + store := state.TestStateStore(t) + srv := &Server{ + fsm: &nomadFSM{state: store}, + } + // has our method under test + csiVolume := &CSIVolume{srv: srv} + // volume for which we will request a valid plugin + vol := &structs.CSIVolume{PluginID: "neat-plugin"} + + // plugin not found + got, err := csiVolume.pluginValidateVolume(vol) + must.Nil(t, got, must.Sprint("nonexistent plugin should be nil")) + must.ErrorContains(t, err, "no CSI plugin named") + + // we'll upsert this plugin after optionally modifying it + basePlug := &structs.CSIPlugin{ + ID: vol.PluginID, + // these should be set on the volume after success + Provider: "neat-provider", + Version: "v0", + // explicit zero values, because these modify behavior we care about + ControllerRequired: false, + ControllersHealthy: 0, + } + + cases := []struct { + name string + updatePlugin func(*structs.CSIPlugin) + expectErr string + }{ + { + name: "controller not required", + }, + { + name: "controller unhealthy", + updatePlugin: func(p *structs.CSIPlugin) { + p.ControllerRequired = true + }, + expectErr: "no healthy controllers", + }, + { + name: "controller healthy", + updatePlugin: func(p *structs.CSIPlugin) { + p.ControllerRequired = true + p.ControllersHealthy = 1 + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + vol := vol.Copy() + plug := basePlug.Copy() + + if tc.updatePlugin != nil { + tc.updatePlugin(plug) + } + must.NoError(t, store.UpsertCSIPlugin(1000, plug)) + + got, err := csiVolume.pluginValidateVolume(vol) + + if tc.expectErr == "" { + must.NoError(t, err) + must.NotNil(t, got, must.Sprint("plugin should not be nil")) + must.Eq(t, vol.Provider, plug.Provider) + must.Eq(t, vol.ProviderVersion, plug.Version) + } else { + must.Error(t, err, must.Sprint("expect error:", tc.expectErr)) + must.ErrorContains(t, err, tc.expectErr) + must.Nil(t, got, must.Sprint("plugin should be nil")) + } + }) + } +} + func TestCSIVolumeEndpoint_Register(t *testing.T) { ci.Parallel(t) srv, shutdown := TestServer(t, func(c *Config) { @@ -1701,6 +1779,143 @@ func TestCSIVolumeEndpoint_ListSnapshots(t *testing.T) { require.Equal(t, "page2", resp.NextToken) } +func TestCSIVolume_expandVolume(t *testing.T) { + ci.Parallel(t) + + srv, cleanupSrv := TestServer(t, nil) + t.Cleanup(cleanupSrv) + testutil.WaitForLeader(t, srv.RPC) + t.Log("server started 👍") + + _, fake, _, fakeVolID := testClientWithCSI(t, srv) + + endpoint := NewCSIVolumeEndpoint(srv, nil) + plug, vol, err := endpoint.volAndPluginLookup(structs.DefaultNamespace, fakeVolID) + must.NoError(t, err) + + // ensure nil checks + expectErr := "unexpected nil value" + err = endpoint.expandVolume(nil, plug, &csi.CapacityRange{}) + must.EqError(t, err, expectErr) + err = endpoint.expandVolume(vol, nil, &csi.CapacityRange{}) + must.EqError(t, err, expectErr) + err = endpoint.expandVolume(vol, plug, nil) + must.EqError(t, err, expectErr) + + // these tests must be run in order, as they mutate vol along the way + cases := []struct { + Name string + + NewMin int64 + NewMax int64 + + ExpectMin int64 + ExpectMax int64 + ControllerResp int64 // new capacity for the mock controller response + ExpectCapacity int64 // expected resulting capacity on the volume + ExpectErr string + }{ + { + // successful expansion from initial vol with no capacity values. + Name: "success", + NewMin: 1000, + NewMax: 2000, + + ExpectMin: 1000, + ExpectMax: 2000, + ControllerResp: 1000, + ExpectCapacity: 1000, + }, + { + // with min/max both zero, no action should be taken, + // so expect no change to desired or actual capacity on the volume. + Name: "zero", + NewMin: 0, + NewMax: 0, + + ExpectMin: 1000, + ExpectMax: 2000, + ControllerResp: 999999, // this should not come into play + ExpectCapacity: 1000, + }, + { + // increasing min is what actually triggers an expand to occur. + Name: "increase min", + NewMin: 1500, + NewMax: 2000, + + ExpectMin: 1500, + ExpectMax: 2000, + ControllerResp: 1500, + ExpectCapacity: 1500, + }, + { + // min going down is okay, but no expand should occur. + Name: "reduce min", + NewMin: 500, + NewMax: 2000, + + ExpectMin: 500, + ExpectMax: 2000, + ControllerResp: 999999, + ExpectCapacity: 1500, + }, + { + // max going up is okay, but no expand should occur. + Name: "increase max", + NewMin: 500, + NewMax: 5000, + + ExpectMin: 500, + ExpectMax: 5000, + ControllerResp: 999999, + ExpectCapacity: 1500, + }, + { + // max lower than min is logically impossible. + Name: "max below min", + NewMin: 3, + NewMax: 2, + ExpectErr: "max requested capacity (2 B) less than or equal to min (3 B)", + }, + { + // volume size cannot be reduced. + Name: "max below current", + NewMax: 2, + ExpectErr: "max requested capacity (2 B) less than or equal to current (1.5 kB)", + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + fake.NextControllerExpandVolumeResponse = &cstructs.ClientCSIControllerExpandVolumeResponse{ + CapacityBytes: tc.ControllerResp, + NodeExpansionRequired: true, + } + + err = endpoint.expandVolume(vol, plug, &csi.CapacityRange{ + RequiredBytes: tc.NewMin, + LimitBytes: tc.NewMax, + }) + + if tc.ExpectErr != "" { + must.EqError(t, err, tc.ExpectErr) + return + } + + must.NoError(t, err) + + test.Eq(t, tc.ExpectCapacity, vol.Capacity, + test.Sprint("unexpected capacity")) + test.Eq(t, tc.ExpectMin, vol.RequestedCapacityMin, + test.Sprint("unexpected min")) + test.Eq(t, tc.ExpectMax, vol.RequestedCapacityMax, + test.Sprint("unexpected max")) + }) + } + +} + func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) { ci.Parallel(t) srv, shutdown := TestServer(t, func(c *Config) { @@ -2043,3 +2258,78 @@ func TestCSI_SerializedControllerRPC(t *testing.T) { must.GreaterEq(t, 50*time.Millisecond, totals["plugin2"]) must.Less(t, 100*time.Millisecond, totals["plugin2"]) } + +// testClientWithCSI sets up a client with a fake CSI plugin. +// Much of the plugin/volume configuration is only to pass validation; +// callers should modify MockClientCSI's Next* fields. +func testClientWithCSI(t *testing.T, srv *Server) (c *client.Client, m *MockClientCSI, plugID, volID string) { + t.Helper() + + m = newMockClientCSI() + plugID = "fake-plugin" + volID = "fake-volume" + + c, cleanup := client.TestClientWithRPCs(t, + func(c *cconfig.Config) { + c.Servers = []string{srv.config.RPCAddr.String()} + c.Node.CSIControllerPlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + Healthy: true, + ControllerInfo: &structs.CSIControllerInfo{ + // Supports.* everything, but Next* values must be set on the mock. + SupportsAttachDetach: true, + SupportsClone: true, + SupportsCondition: true, + SupportsCreateDelete: true, + SupportsCreateDeleteSnapshot: true, + SupportsExpand: true, + SupportsGet: true, + SupportsGetCapacity: true, + SupportsListSnapshots: true, + SupportsListVolumes: true, + SupportsListVolumesAttachedNodes: true, + SupportsReadOnlyAttach: true, + }, + RequiresControllerPlugin: true, + }, + } + c.Node.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + Healthy: true, + NodeInfo: &structs.CSINodeInfo{ + ID: c.Node.GetID(), + SupportsCondition: true, + SupportsExpand: true, + SupportsStats: true, + }, + }, + } + }, + map[string]interface{}{"CSI": m}, // MockClientCSI + ) + t.Cleanup(func() { test.NoError(t, cleanup()) }) + testutil.WaitForClient(t, srv.RPC, c.NodeID(), c.Region()) + t.Log("client started with fake CSI plugin 👍") + + // Register a minimum-viable fake volume + req := &structs.CSIVolumeRegisterRequest{ + Volumes: []*structs.CSIVolume{{ + PluginID: plugID, + ID: volID, + Namespace: structs.DefaultNamespace, + RequestedCapabilities: []*structs.CSIVolumeCapability{ + { + AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + }, + }, + }}, + WriteRequest: structs.WriteRequest{Region: srv.Region()}, + } + must.NoError(t, srv.RPC("CSIVolume.Register", req, &structs.CSIVolumeRegisterResponse{})) + t.Logf("CSI volume %s registered 👍", volID) + + return c, m, plugID, volID +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index bc21105aec6..8b03bd5e49e 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2421,21 +2421,21 @@ func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume) } if obj != nil { // Allow some properties of a volume to be updated in place, but - // prevent accidentally overwriting important properties, or - // overwriting a volume in use + // prevent accidentally overwriting important properties. old := obj.(*structs.CSIVolume) if old.ExternalID != v.ExternalID || old.PluginID != v.PluginID || old.Provider != v.Provider { return fmt.Errorf("volume identity cannot be updated: %s", v.ID) } - s.CSIVolumeDenormalize(nil, old.Copy()) - if old.InUse() { - return fmt.Errorf("volume cannot be updated while in use") - } - v.CreateIndex = old.CreateIndex + // Update fields that are safe to change while volume is being used. + if err := old.UpdateSafeFields(v); err != nil { + return fmt.Errorf("unable to update in-use volume: %w", err) + } + v = old v.ModifyIndex = index + } else { v.CreateIndex = index v.ModifyIndex = index diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 9f67e78c6c1..03109d6afeb 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -3869,11 +3869,7 @@ func TestStateStore_CSIVolume(t *testing.T) { vs = slurp(iter) require.True(t, vs[0].ReadSchedulable()) - // registration is an error when the volume is in use - index++ - err = state.UpsertCSIVolume(index, []*structs.CSIVolume{v0}) - require.Error(t, err, "volume re-registered while in use") - // as is deregistration + // deregistration is an error when the volume is in use index++ err = state.CSIVolumeDeregister(index, ns, []string{vol0}, false) require.Error(t, err, "volume deregistered while in use") diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index b8140f0266d..dc3467e410a 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -12,6 +12,7 @@ import ( "time" multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/helper" ) @@ -816,20 +817,6 @@ func (v *CSIVolume) Merge(other *CSIVolume) error { "volume snapshot ID cannot be updated")) } - // must be compatible with capacity range - // TODO: when ExpandVolume is implemented we'll need to update - // this logic https://github.com/hashicorp/nomad/issues/10324 - if v.Capacity != 0 { - if other.RequestedCapacityMax < v.Capacity || - other.RequestedCapacityMin > v.Capacity { - errs = multierror.Append(errs, errors.New( - "volume requested capacity update was not compatible with existing capacity")) - } else { - v.RequestedCapacityMin = other.RequestedCapacityMin - v.RequestedCapacityMax = other.RequestedCapacityMax - } - } - // must be compatible with volume_capabilities if v.AccessMode != CSIVolumeAccessModeUnknown || v.AttachmentMode != CSIVolumeAttachmentModeUnknown { @@ -880,6 +867,20 @@ func (v *CSIVolume) Merge(other *CSIVolume) error { return errs.ErrorOrNil() } +// UpdateSafeFields updates fields that may be mutated while the volume is in use. +func (v *CSIVolume) UpdateSafeFields(other *CSIVolume) error { + if v == nil || other == nil { + return errors.New("unexpected nil volume (this is a bug)") + } + + // Expand operation can sometimes happen while in-use. + v.Capacity = other.Capacity + v.RequestedCapacityMin = other.RequestedCapacityMin + v.RequestedCapacityMax = other.RequestedCapacityMax + + return nil +} + // Request and response wrappers type CSIVolumeRegisterRequest struct { Volumes []*CSIVolume @@ -920,6 +921,19 @@ type CSIVolumeDeleteResponse struct { QueryMeta } +type CSIVolumeExpandRequest struct { + VolumeID string + RequestedCapacityMin int64 + RequestedCapacityMax int64 + Secrets CSISecrets + WriteRequest +} + +type CSIVolumeExpandResponse struct { + CapacityBytes int64 + QueryMeta +} + type CSIVolumeClaimMode int const ( diff --git a/nomad/structs/csi_test.go b/nomad/structs/csi_test.go index ba57dc4ab80..b4f20b893cf 100644 --- a/nomad/structs/csi_test.go +++ b/nomad/structs/csi_test.go @@ -595,17 +595,6 @@ func TestCSIVolume_Merge(t *testing.T) { expected string expectFn func(t *testing.T, v *CSIVolume) }{ - { - name: "invalid capacity update", - v: &CSIVolume{Capacity: 100}, - update: &CSIVolume{ - RequestedCapacityMax: 300, RequestedCapacityMin: 200}, - expected: "volume requested capacity update was not compatible with existing capacity", - expectFn: func(t *testing.T, v *CSIVolume) { - require.NotEqual(t, 300, v.RequestedCapacityMax) - require.NotEqual(t, 200, v.RequestedCapacityMin) - }, - }, { name: "invalid capability update", v: &CSIVolume{ diff --git a/plugins/csi/client.go b/plugins/csi/client.go index dac79a321b4..0f101f4c5a5 100644 --- a/plugins/csi/client.go +++ b/plugins/csi/client.go @@ -14,14 +14,15 @@ import ( csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" - "github.com/hashicorp/nomad/helper/grpc-middleware/logging" - "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/plugins/base" - "github.com/hashicorp/nomad/plugins/shared/hclspec" "golang.org/x/exp/maps" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + + "github.com/hashicorp/nomad/helper/grpc-middleware/logging" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/plugins/shared/hclspec" ) // PluginTypeCSI implements the CSI plugin interface @@ -75,6 +76,7 @@ type CSIControllerClient interface { CreateVolume(ctx context.Context, in *csipbv1.CreateVolumeRequest, opts ...grpc.CallOption) (*csipbv1.CreateVolumeResponse, error) ListVolumes(ctx context.Context, in *csipbv1.ListVolumesRequest, opts ...grpc.CallOption) (*csipbv1.ListVolumesResponse, error) DeleteVolume(ctx context.Context, in *csipbv1.DeleteVolumeRequest, opts ...grpc.CallOption) (*csipbv1.DeleteVolumeResponse, error) + ControllerExpandVolume(ctx context.Context, in *csipbv1.ControllerExpandVolumeRequest, opts ...grpc.CallOption) (*csipbv1.ControllerExpandVolumeResponse, error) CreateSnapshot(ctx context.Context, in *csipbv1.CreateSnapshotRequest, opts ...grpc.CallOption) (*csipbv1.CreateSnapshotResponse, error) DeleteSnapshot(ctx context.Context, in *csipbv1.DeleteSnapshotRequest, opts ...grpc.CallOption) (*csipbv1.DeleteSnapshotResponse, error) ListSnapshots(ctx context.Context, in *csipbv1.ListSnapshotsRequest, opts ...grpc.CallOption) (*csipbv1.ListSnapshotsResponse, error) @@ -89,6 +91,7 @@ type CSINodeClient interface { NodeUnstageVolume(ctx context.Context, in *csipbv1.NodeUnstageVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnstageVolumeResponse, error) NodePublishVolume(ctx context.Context, in *csipbv1.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodePublishVolumeResponse, error) NodeUnpublishVolume(ctx context.Context, in *csipbv1.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnpublishVolumeResponse, error) + NodeExpandVolume(ctx context.Context, in *csipbv1.NodeExpandVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeExpandVolumeResponse, error) } type client struct { @@ -510,6 +513,44 @@ func (c *client) ControllerDeleteVolume(ctx context.Context, req *ControllerDele return err } +func (c *client) ControllerExpandVolume(ctx context.Context, req *ControllerExpandVolumeRequest, opts ...grpc.CallOption) (*ControllerExpandVolumeResponse, error) { + if err := req.Validate(); err != nil { + return nil, err + } + if err := c.ensureConnected(ctx); err != nil { + return nil, err + } + + exReq := req.ToCSIRepresentation() + resp, err := c.controllerClient.ControllerExpandVolume(ctx, exReq, opts...) + if err != nil { + code := status.Code(err) + switch code { + case codes.InvalidArgument: + return nil, fmt.Errorf( + "requested capabilities not compatible with volume %q: %v", + req.ExternalVolumeID, err) + case codes.NotFound: + err = fmt.Errorf("volume %q could not be found: %v", req.ExternalVolumeID, err) + case codes.FailedPrecondition: + err = fmt.Errorf("volume %q cannot be expanded online: %v", req.ExternalVolumeID, err) + case codes.OutOfRange: + return nil, fmt.Errorf( + "unsupported capacity_range for volume %q: %v", req.ExternalVolumeID, err) + case codes.Internal: + err = fmt.Errorf("controller plugin returned an internal error, check the plugin allocation logs for more information: %v", err) + default: + err = fmt.Errorf("controller plugin returned an error: %v", err) + } + return nil, err + } + + return &ControllerExpandVolumeResponse{ + CapacityBytes: resp.GetCapacityBytes(), + NodeExpansionRequired: resp.GetNodeExpansionRequired(), + }, nil +} + // compareCapabilities returns an error if the 'got' capabilities aren't found // within the 'expected' capability. // @@ -883,3 +924,7 @@ func (c *client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath s return err } + +func (c *client) NodeExpandVolume(ctx context.Context, req *NodeExpandVolumeRequest, opts ...grpc.CallOption) (*NodeExpandVolumeResponse, error) { + return nil, nil +} diff --git a/plugins/csi/client_test.go b/plugins/csi/client_test.go index 3f4dd7fb8d7..6947a953eea 100644 --- a/plugins/csi/client_test.go +++ b/plugins/csi/client_test.go @@ -13,14 +13,16 @@ import ( csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/protobuf/ptypes/wrappers" - "github.com/hashicorp/nomad/ci" - "github.com/hashicorp/nomad/nomad/structs" - fake "github.com/hashicorp/nomad/plugins/csi/testing" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/nomad/structs" + fake "github.com/hashicorp/nomad/plugins/csi/testing" ) func newTestClient(t *testing.T) (*fake.IdentityClient, *fake.ControllerClient, *fake.NodeClient, CSIPlugin) { @@ -42,6 +44,9 @@ func newTestClient(t *testing.T) (*fake.IdentityClient, *fake.ControllerClient, controllerClient: cc, nodeClient: nc, } + t.Cleanup(func() { + _ = client.Close() + }) return ic, cc, nc, client } @@ -1170,6 +1175,155 @@ func TestClient_RPC_ControllerListSnapshots(t *testing.T) { } } +func TestClient_RPC_ControllerExpandVolume(t *testing.T) { + + cases := []struct { + Name string + Request *ControllerExpandVolumeRequest + ExpectCall *csipbv1.ControllerExpandVolumeRequest + ResponseErr error + ExpectedErr error + }{ + { + Name: "success", + Request: &ControllerExpandVolumeRequest{ + ExternalVolumeID: "vol-1", + RequiredBytes: 1, + LimitBytes: 2, + Capability: &VolumeCapability{ + AccessMode: VolumeAccessModeMultiNodeSingleWriter, + }, + Secrets: map[string]string{"super": "secret"}, + }, + ExpectCall: &csipbv1.ControllerExpandVolumeRequest{ + VolumeId: "vol-1", + CapacityRange: &csipbv1.CapacityRange{ + RequiredBytes: 1, + LimitBytes: 2, + }, + VolumeCapability: &csipbv1.VolumeCapability{ + AccessMode: &csipbv1.VolumeCapability_AccessMode{ + Mode: csipbv1.VolumeCapability_AccessMode_Mode(csipbv1.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER), + }, + AccessType: &csipbv1.VolumeCapability_Block{Block: &csipbv1.VolumeCapability_BlockVolume{}}, + }, + Secrets: map[string]string{"super": "secret"}, + }, + }, + + { + Name: "validate only min set", + Request: &ControllerExpandVolumeRequest{ + ExternalVolumeID: "vol-1", + RequiredBytes: 4, + }, + ExpectCall: &csipbv1.ControllerExpandVolumeRequest{ + VolumeId: "vol-1", + CapacityRange: &csipbv1.CapacityRange{ + RequiredBytes: 4, + }, + }, + }, + { + Name: "validate missing volume ID", + Request: &ControllerExpandVolumeRequest{}, + ExpectedErr: errors.New("missing ExternalVolumeID"), + }, + { + Name: "validate missing max/min size", + Request: &ControllerExpandVolumeRequest{ + ExternalVolumeID: "vol-1", + }, + ExpectedErr: errors.New("one of LimitBytes or RequiredBytes must be set"), + }, + { + Name: "validate min greater than max", + Request: &ControllerExpandVolumeRequest{ + ExternalVolumeID: "vol-1", + RequiredBytes: 4, + LimitBytes: 2, + }, + ExpectedErr: errors.New("LimitBytes cannot be less than RequiredBytes"), + }, + + { + Name: "grpc error InvalidArgument", + Request: &ControllerExpandVolumeRequest{ + ExternalVolumeID: "vol-1", LimitBytes: 1000}, + ResponseErr: status.Errorf(codes.InvalidArgument, "sad args"), + ExpectedErr: errors.New("requested capabilities not compatible with volume \"vol-1\": rpc error: code = InvalidArgument desc = sad args"), + }, + + { + Name: "grpc error NotFound", + Request: &ControllerExpandVolumeRequest{ + ExternalVolumeID: "vol-1", LimitBytes: 1000}, + ResponseErr: status.Errorf(codes.NotFound, "does not exist"), + ExpectedErr: errors.New("volume \"vol-1\" could not be found: rpc error: code = NotFound desc = does not exist"), + }, + { + Name: "grpc error FailedPrecondition", + Request: &ControllerExpandVolumeRequest{ + ExternalVolumeID: "vol-1", LimitBytes: 1000}, + ResponseErr: status.Errorf(codes.FailedPrecondition, "unsupported"), + ExpectedErr: errors.New("volume \"vol-1\" cannot be expanded online: rpc error: code = FailedPrecondition desc = unsupported"), + }, + { + Name: "grpc error OutOfRange", + Request: &ControllerExpandVolumeRequest{ + ExternalVolumeID: "vol-1", LimitBytes: 1000}, + ResponseErr: status.Errorf(codes.OutOfRange, "too small"), + ExpectedErr: errors.New("unsupported capacity_range for volume \"vol-1\": rpc error: code = OutOfRange desc = too small"), + }, + { + Name: "grpc error Internal", + Request: &ControllerExpandVolumeRequest{ + ExternalVolumeID: "vol-1", LimitBytes: 1000}, + ResponseErr: status.Errorf(codes.Internal, "some grpc error"), + ExpectedErr: errors.New("controller plugin returned an internal error, check the plugin allocation logs for more information: rpc error: code = Internal desc = some grpc error"), + }, + { + Name: "grpc error default case", + Request: &ControllerExpandVolumeRequest{ + ExternalVolumeID: "vol-1", LimitBytes: 1000}, + ResponseErr: status.Errorf(codes.DataLoss, "misc unspecified error"), + ExpectedErr: errors.New("controller plugin returned an error: rpc error: code = DataLoss desc = misc unspecified error"), + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + _, cc, _, client := newTestClient(t) + + cc.NextErr = tc.ResponseErr + // the fake client should take ~no time, but set a timeout just in case + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50) + defer cancel() + resp, err := client.ControllerExpandVolume(ctx, tc.Request) + if tc.ExpectedErr != nil { + must.EqError(t, err, tc.ExpectedErr.Error()) + return + } + must.NoError(t, err) + must.NotNil(t, resp) + must.Eq(t, tc.ExpectCall, cc.LastExpandVolumeRequest) + + }) + } + + t.Run("connection error", func(t *testing.T) { + c := &client{} // induce c.ensureConnected() error + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50) + defer cancel() + resp, err := c.ControllerExpandVolume(ctx, &ControllerExpandVolumeRequest{ + ExternalVolumeID: "valid-id", + RequiredBytes: 1, + }) + must.Nil(t, resp) + must.EqError(t, err, "address is empty") + }) +} + func TestClient_RPC_NodeStageVolume(t *testing.T) { ci.Parallel(t) diff --git a/plugins/csi/fake/client.go b/plugins/csi/fake/client.go index 74f142d9615..22b8d46c01d 100644 --- a/plugins/csi/fake/client.go +++ b/plugins/csi/fake/client.go @@ -11,10 +11,11 @@ import ( "fmt" "sync" + "google.golang.org/grpc" + "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/csi" "github.com/hashicorp/nomad/plugins/shared/hclspec" - "google.golang.org/grpc" ) var _ csi.CSIPlugin = &Client{} @@ -78,6 +79,10 @@ type Client struct { NextControllerListSnapshotsErr error ControllerListSnapshotsCallCount int64 + NextControllerExpandVolumeResponse *csi.ControllerExpandVolumeResponse + NextControllerExpandVolumeErr error + ControllerExpandVolumeCallCount int64 + NextNodeGetCapabilitiesResponse *csi.NodeCapabilitySet NextNodeGetCapabilitiesErr error NodeGetCapabilitiesCallCount int64 @@ -98,6 +103,10 @@ type Client struct { NextNodeUnpublishVolumeErr error NodeUnpublishVolumeCallCount int64 + + NextNodeExpandVolumeResponse *csi.NodeExpandVolumeResponse + NextNodeExpandVolumeErr error + NodeExpandVolumeCallCount int64 } // PluginInfo describes the type and version of a plugin. @@ -235,6 +244,13 @@ func (c *Client) ControllerListSnapshots(ctx context.Context, req *csi.Controlle return c.NextControllerListSnapshotsResponse, c.NextControllerListSnapshotsErr } +func (c *Client) ControllerExpandVolume(ctx context.Context, in *csi.ControllerExpandVolumeRequest, opts ...grpc.CallOption) (*csi.ControllerExpandVolumeResponse, error) { + c.Mu.Lock() + defer c.Mu.Unlock() + c.ControllerExpandVolumeCallCount++ + return c.NextControllerExpandVolumeResponse, c.NextControllerExpandVolumeErr +} + func (c *Client) NodeGetCapabilities(ctx context.Context) (*csi.NodeCapabilitySet, error) { c.Mu.Lock() defer c.Mu.Unlock() @@ -300,6 +316,14 @@ func (c *Client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath s return c.NextNodeUnpublishVolumeErr } +func (c *Client) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest, opts ...grpc.CallOption) (*csi.NodeExpandVolumeResponse, error) { + c.Mu.Lock() + defer c.Mu.Unlock() + + c.NodeExpandVolumeCallCount++ + return c.NextNodeExpandVolumeResponse, c.NextNodeExpandVolumeErr +} + // Close the client and ensure any connections are cleaned up. func (c *Client) Close() error { @@ -325,6 +349,9 @@ func (c *Client) Close() error { c.NextControllerUnpublishVolumeResponse = nil c.NextControllerUnpublishVolumeErr = fmt.Errorf("closed client") + c.NextControllerExpandVolumeResponse = nil + c.NextControllerExpandVolumeErr = fmt.Errorf("closed client") + c.NextControllerValidateVolumeErr = fmt.Errorf("closed client") c.NextNodeGetCapabilitiesResponse = nil @@ -341,5 +368,8 @@ func (c *Client) Close() error { c.NextNodeUnpublishVolumeErr = fmt.Errorf("closed client") + c.NextNodeExpandVolumeResponse = nil + c.NextNodeExpandVolumeErr = fmt.Errorf("closed client") + return nil } diff --git a/plugins/csi/plugin.go b/plugins/csi/plugin.go index 9a9b5f089ce..df3c9c4fd01 100644 --- a/plugins/csi/plugin.go +++ b/plugins/csi/plugin.go @@ -9,9 +9,10 @@ import ( "fmt" csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/base" - "google.golang.org/grpc" ) // CSIPlugin implements a lightweight abstraction layer around a CSI Plugin. @@ -60,6 +61,9 @@ type CSIPlugin interface { // external storage provider ControllerListVolumes(ctx context.Context, req *ControllerListVolumesRequest, opts ...grpc.CallOption) (*ControllerListVolumesResponse, error) + // ControllerExpandVolume is used to expand a volume's size + ControllerExpandVolume(ctx context.Context, req *ControllerExpandVolumeRequest, opts ...grpc.CallOption) (*ControllerExpandVolumeResponse, error) + // ControllerCreateSnapshot is used to create a volume snapshot in the // external storage provider ControllerCreateSnapshot(ctx context.Context, req *ControllerCreateSnapshotRequest, opts ...grpc.CallOption) (*ControllerCreateSnapshotResponse, error) @@ -101,6 +105,11 @@ type CSIPlugin interface { // for the given volume. NodeUnpublishVolume(ctx context.Context, volumeID, targetPath string, opts ...grpc.CallOption) error + // NodeExpandVolume is used to expand a volume. This MUST be called after + // any ControllerExpandVolume is called, but only if that RPC indicates + // that node expansion is required + NodeExpandVolume(ctx context.Context, req *NodeExpandVolumeRequest, opts ...grpc.CallOption) (*NodeExpandVolumeResponse, error) + // Shutdown the client and ensure any connections are cleaned up. Close() error } @@ -492,7 +501,8 @@ func (r *ControllerCreateVolumeRequest) Validate() error { return errors.New( "one of LimitBytes or RequiredBytes must be set if CapacityRange is set") } - if r.CapacityRange.LimitBytes < r.CapacityRange.RequiredBytes { + if r.CapacityRange.LimitBytes > 0 && + r.CapacityRange.LimitBytes < r.CapacityRange.RequiredBytes { return errors.New("LimitBytes cannot be less than RequiredBytes") } } @@ -625,6 +635,49 @@ func (r *ControllerDeleteVolumeRequest) Validate() error { return nil } +type ControllerExpandVolumeRequest struct { + ExternalVolumeID string + RequiredBytes int64 + LimitBytes int64 + Capability *VolumeCapability + Secrets structs.CSISecrets +} + +func (r *ControllerExpandVolumeRequest) Validate() error { + if r.ExternalVolumeID == "" { + return errors.New("missing ExternalVolumeID") + } + if r.LimitBytes == 0 && r.RequiredBytes == 0 { + return errors.New("one of LimitBytes or RequiredBytes must be set") + } + // per the spec: "A value of 0 is equal to an unspecified field value." + // so in this case, only error if both are set. + if r.LimitBytes > 0 && (r.LimitBytes < r.RequiredBytes) { + return errors.New("LimitBytes cannot be less than RequiredBytes") + } + return nil +} + +func (r *ControllerExpandVolumeRequest) ToCSIRepresentation() *csipbv1.ControllerExpandVolumeRequest { + if r == nil { + return nil + } + return &csipbv1.ControllerExpandVolumeRequest{ + VolumeId: r.ExternalVolumeID, + CapacityRange: &csipbv1.CapacityRange{ + RequiredBytes: r.RequiredBytes, + LimitBytes: r.LimitBytes, + }, + Secrets: r.Secrets, + VolumeCapability: r.Capability.ToCSIRepresentation(), + } +} + +type ControllerExpandVolumeResponse struct { + CapacityBytes int64 + NodeExpansionRequired bool +} + type ControllerListVolumesRequest struct { MaxEntries int32 StartingToken string @@ -976,3 +1029,32 @@ func (c *CapacityRange) ToCSIRepresentation() *csipbv1.CapacityRange { LimitBytes: c.LimitBytes, } } + +type NodeExpandVolumeRequest struct { + ExternalVolumeID string + RequiredBytes int64 + LimitBytes int64 + TargetPath string + StagingPath string + Capability *VolumeCapability +} + +func (r *NodeExpandVolumeRequest) ToCSIRepresentation() *csipbv1.NodeExpandVolumeRequest { + if r == nil { + return nil + } + return &csipbv1.NodeExpandVolumeRequest{ + VolumeId: r.ExternalVolumeID, + VolumePath: r.TargetPath, + CapacityRange: &csipbv1.CapacityRange{ + RequiredBytes: r.RequiredBytes, + LimitBytes: r.LimitBytes, + }, + StagingTargetPath: r.StagingPath, + VolumeCapability: r.Capability.ToCSIRepresentation(), + } +} + +type NodeExpandVolumeResponse struct { + CapacityBytes int64 +} diff --git a/plugins/csi/testing/client.go b/plugins/csi/testing/client.go index 55f515c30b9..7595d79f2f6 100644 --- a/plugins/csi/testing/client.go +++ b/plugins/csi/testing/client.go @@ -54,6 +54,8 @@ type ControllerClient struct { NextUnpublishVolumeResponse *csipbv1.ControllerUnpublishVolumeResponse NextValidateVolumeCapabilitiesResponse *csipbv1.ValidateVolumeCapabilitiesResponse NextCreateVolumeResponse *csipbv1.CreateVolumeResponse + NextExpandVolumeResponse *csipbv1.ControllerExpandVolumeResponse + LastExpandVolumeRequest *csipbv1.ControllerExpandVolumeRequest NextDeleteVolumeResponse *csipbv1.DeleteVolumeResponse NextListVolumesResponse *csipbv1.ListVolumesResponse NextCreateSnapshotResponse *csipbv1.CreateSnapshotResponse @@ -73,6 +75,8 @@ func (c *ControllerClient) Reset() { c.NextUnpublishVolumeResponse = nil c.NextValidateVolumeCapabilitiesResponse = nil c.NextCreateVolumeResponse = nil + c.NextExpandVolumeResponse = nil + c.LastExpandVolumeRequest = nil c.NextDeleteVolumeResponse = nil c.NextListVolumesResponse = nil c.NextCreateSnapshotResponse = nil @@ -111,6 +115,11 @@ func (c *ControllerClient) CreateVolume(ctx context.Context, in *csipbv1.CreateV return c.NextCreateVolumeResponse, c.NextErr } +func (c *ControllerClient) ControllerExpandVolume(ctx context.Context, in *csipbv1.ControllerExpandVolumeRequest, opts ...grpc.CallOption) (*csipbv1.ControllerExpandVolumeResponse, error) { + c.LastExpandVolumeRequest = in + return c.NextExpandVolumeResponse, c.NextErr +} + func (c *ControllerClient) DeleteVolume(ctx context.Context, in *csipbv1.DeleteVolumeRequest, opts ...grpc.CallOption) (*csipbv1.DeleteVolumeResponse, error) { return c.NextDeleteVolumeResponse, c.NextErr } @@ -140,6 +149,7 @@ type NodeClient struct { NextUnstageVolumeResponse *csipbv1.NodeUnstageVolumeResponse NextPublishVolumeResponse *csipbv1.NodePublishVolumeResponse NextUnpublishVolumeResponse *csipbv1.NodeUnpublishVolumeResponse + NextExpandVolumeResponse *csipbv1.NodeExpandVolumeResponse } // NewNodeClient returns a new stub NodeClient @@ -155,6 +165,7 @@ func (c *NodeClient) Reset() { c.NextUnstageVolumeResponse = nil c.NextPublishVolumeResponse = nil c.NextUnpublishVolumeResponse = nil + c.NextExpandVolumeResponse = nil } func (c *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipbv1.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipbv1.NodeGetCapabilitiesResponse, error) { @@ -180,3 +191,7 @@ func (c *NodeClient) NodePublishVolume(ctx context.Context, in *csipbv1.NodePubl func (c *NodeClient) NodeUnpublishVolume(ctx context.Context, in *csipbv1.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnpublishVolumeResponse, error) { return c.NextUnpublishVolumeResponse, c.NextErr } + +func (c *NodeClient) NodeExpandVolume(ctx context.Context, in *csipbv1.NodeExpandVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeExpandVolumeResponse, error) { + return c.NextExpandVolumeResponse, c.NextErr +}