Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

csi: add VolumeContext to NodeStage/Publish RPCs #8239

Merged
merged 1 commit into from
Jun 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions client/pluginmanager/csimanager/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,18 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume,
return err
}

req := &csi.NodeStageVolumeRequest{
ExternalID: vol.RemoteID(),
PublishContext: publishContext,
StagingTargetPath: pluginStagingPath,
VolumeCapability: capability,
Secrets: vol.Secrets,
VolumeContext: vol.Context,
}

// CSI NodeStageVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
return v.plugin.NodeStageVolume(ctx,
vol.RemoteID(),
publishContext,
pluginStagingPath,
capability,
vol.Secrets,
return v.plugin.NodeStageVolume(ctx, req,
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
Expand Down Expand Up @@ -210,6 +214,7 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum
VolumeCapability: capabilities,
Readonly: usage.ReadOnly,
Secrets: vol.Secrets,
VolumeContext: vol.Context,
},
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
grpc_retry.WithMax(3),
Expand Down
30 changes: 8 additions & 22 deletions plugins/csi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/grpc-middleware/logging"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"google.golang.org/grpc"
Expand Down Expand Up @@ -496,46 +495,33 @@ func (c *client) NodeGetInfo(ctx context.Context) (*NodeGetInfoResponse, error)
return result, nil
}

func (c *client) NodeStageVolume(ctx context.Context, volumeID string, publishContext map[string]string, stagingTargetPath string, capabilities *VolumeCapability, secrets structs.CSISecrets, opts ...grpc.CallOption) error {
func (c *client) NodeStageVolume(ctx context.Context, req *NodeStageVolumeRequest, opts ...grpc.CallOption) error {
if c == nil {
return fmt.Errorf("Client not initialized")
}
if c.nodeClient == nil {
return fmt.Errorf("Client not initialized")
}

// These errors should not be returned during production use but exist as aids
// during Nomad development
if volumeID == "" {
return fmt.Errorf("missing volumeID")
}
if stagingTargetPath == "" {
return fmt.Errorf("missing stagingTargetPath")
}

req := &csipbv1.NodeStageVolumeRequest{
VolumeId: volumeID,
PublishContext: publishContext,
StagingTargetPath: stagingTargetPath,
VolumeCapability: capabilities.ToCSIRepresentation(),
Secrets: secrets,
err := req.Validate()
if err != nil {
return err
}

// NodeStageVolume's response contains no extra data. If err == nil, we were
// successful.
_, err := c.nodeClient.NodeStageVolume(ctx, req, opts...)
_, err = c.nodeClient.NodeStageVolume(ctx, req.ToCSIRepresentation(), opts...)
if err != nil {
code := status.Code(err)
switch code {
case codes.NotFound:
err = fmt.Errorf("volume %q could not be found: %v", volumeID, err)
err = fmt.Errorf("volume %q could not be found: %v", req.ExternalID, err)
case codes.AlreadyExists:
err = fmt.Errorf(
"volume %q is already staged to %q but with incompatible capabilities for this request: %v",
volumeID, stagingTargetPath, err)
req.ExternalID, req.StagingTargetPath, err)
case codes.FailedPrecondition:
err = fmt.Errorf("volume %q is already published on another node and does not have MULTI_NODE volume capability: %v",
volumeID, err)
req.ExternalID, err)
case codes.Internal:
err = fmt.Errorf("node plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
}
Expand Down
7 changes: 5 additions & 2 deletions plugins/csi/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,8 +628,11 @@ func TestClient_RPC_NodeStageVolume(t *testing.T) {
nc.NextErr = tc.ResponseErr
nc.NextStageVolumeResponse = tc.Response

err := client.NodeStageVolume(context.TODO(), "foo", nil, "/path",
&VolumeCapability{}, structs.CSISecrets{})
err := client.NodeStageVolume(context.TODO(), &NodeStageVolumeRequest{
ExternalID: "foo",
StagingTargetPath: "/path",
VolumeCapability: &VolumeCapability{},
})
if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())
} else {
Expand Down
3 changes: 1 addition & 2 deletions plugins/csi/fake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"sync"

"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/csi"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
Expand Down Expand Up @@ -192,7 +191,7 @@ func (c *Client) NodeGetInfo(ctx context.Context) (*csi.NodeGetInfoResponse, err
// NodeStageVolume is used when a plugin has the STAGE_UNSTAGE volume capability
// to prepare a volume for usage on a host. If err == nil, the response should
// be assumed to be successful.
func (c *Client) NodeStageVolume(ctx context.Context, volumeID string, publishContext map[string]string, stagingTargetPath string, capabilities *csi.VolumeCapability, secrets structs.CSISecrets, opts ...grpc.CallOption) error {
func (c *Client) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest, opts ...grpc.CallOption) error {
c.Mu.Lock()
defer c.Mu.Unlock()

Expand Down
71 changes: 70 additions & 1 deletion plugins/csi/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type CSIPlugin interface {
// NodeStageVolume is used when a plugin has the STAGE_UNSTAGE volume capability
// to prepare a volume for usage on a host. If err == nil, the response should
// be assumed to be successful.
NodeStageVolume(ctx context.Context, volumeID string, publishContext map[string]string, stagingTargetPath string, capabilities *VolumeCapability, secrets structs.CSISecrets, opts ...grpc.CallOption) error
NodeStageVolume(ctx context.Context, req *NodeStageVolumeRequest, opts ...grpc.CallOption) error

// NodeUnstageVolume is used when a plugin has the STAGE_UNSTAGE volume capability
// to undo the work performed by NodeStageVolume. If a volume has been staged,
Expand Down Expand Up @@ -114,6 +114,11 @@ type NodePublishVolumeRequest struct {
// Secrets required by plugins to complete the node publish volume
// request. This field is OPTIONAL.
Secrets structs.CSISecrets

// Volume context as returned by SP in the CSI
// CreateVolumeResponse.Volume.volume_context which we don't implement but
// can be entered by hand in the volume spec. This field is OPTIONAL.
VolumeContext map[string]string
}

func (r *NodePublishVolumeRequest) ToCSIRepresentation() *csipbv1.NodePublishVolumeRequest {
Expand All @@ -129,6 +134,7 @@ func (r *NodePublishVolumeRequest) ToCSIRepresentation() *csipbv1.NodePublishVol
VolumeCapability: r.VolumeCapability.ToCSIRepresentation(),
Readonly: r.Readonly,
Secrets: r.Secrets,
VolumeContext: r.VolumeContext,
}
}

Expand All @@ -148,6 +154,69 @@ func (r *NodePublishVolumeRequest) Validate() error {
return nil
}

type NodeStageVolumeRequest struct {
// The external ID of the volume to stage.
ExternalID string

// If the volume was attached via a call to `ControllerPublishVolume` then
// we need to provide the returned PublishContext here.
PublishContext map[string]string

// The path to which the volume MAY be staged. It MUST be an
// absolute path in the root filesystem of the process serving this
// request, and MUST be a directory. The CO SHALL ensure that there
// is only one `staging_target_path` per volume. The CO SHALL ensure
// that the path is directory and that the process serving the
// request has `read` and `write` permission to that directory. The
// CO SHALL be responsible for creating the directory if it does not
// exist.
// This is a REQUIRED field.
StagingTargetPath string

// Volume capability describing how the CO intends to use this volume.
VolumeCapability *VolumeCapability

// Secrets required by plugins to complete the node stage volume
// request. This field is OPTIONAL.
Secrets structs.CSISecrets

// Volume context as returned by SP in the CSI
// CreateVolumeResponse.Volume.volume_context which we don't implement but
// can be entered by hand in the volume spec. This field is OPTIONAL.
VolumeContext map[string]string
}

func (r *NodeStageVolumeRequest) ToCSIRepresentation() *csipbv1.NodeStageVolumeRequest {
if r == nil {
return nil
}

return &csipbv1.NodeStageVolumeRequest{
VolumeId: r.ExternalID,
PublishContext: r.PublishContext,
StagingTargetPath: r.StagingTargetPath,
VolumeCapability: r.VolumeCapability.ToCSIRepresentation(),
Secrets: r.Secrets,
VolumeContext: r.VolumeContext,
}
}

func (r *NodeStageVolumeRequest) Validate() error {
tgross marked this conversation as resolved.
Show resolved Hide resolved
if r.ExternalID == "" {
return errors.New("missing volume ID")
}

if r.StagingTargetPath == "" {
return errors.New("missing StagingTargetPath")
}

if r.VolumeCapability == nil {
return errors.New("missing VolumeCapabilities")
}

return nil
}

type PluginCapabilitySet struct {
hasControllerService bool
hasTopologies bool
Expand Down