Skip to content

Commit

Permalink
csi: add VolumeContext to NodeStage/Publish RPCs
Browse files Browse the repository at this point in the history
In #7957 we added support for passing a volume context to the controller RPCs.
This is an opaque map that's created by `CreateVolume` or, in Nomad's case,
in the volume registration spec.

However, we missed passing this field to the `NodeStage` and `NodePublish` RPC,
which prevents certain plugins (such as MooseFS) from making node RPCs.
  • Loading branch information
tgross committed Jun 22, 2020
1 parent 3892522 commit 901664f
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 33 deletions.
17 changes: 11 additions & 6 deletions client/pluginmanager/csimanager/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,18 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume,
return err
}

req := &csi.NodeStageVolumeRequest{
ExternalID: vol.RemoteID(),
PublishContext: publishContext,
StagingTargetPath: pluginStagingPath,
VolumeCapability: capability,
Secrets: vol.Secrets,
VolumeContext: vol.Context,
}

// CSI NodeStageVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
return v.plugin.NodeStageVolume(ctx,
vol.RemoteID(),
publishContext,
pluginStagingPath,
capability,
vol.Secrets,
return v.plugin.NodeStageVolume(ctx, req,
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
Expand Down Expand Up @@ -210,6 +214,7 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum
VolumeCapability: capabilities,
Readonly: usage.ReadOnly,
Secrets: vol.Secrets,
VolumeContext: vol.Context,
},
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
grpc_retry.WithMax(3),
Expand Down
30 changes: 8 additions & 22 deletions plugins/csi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/grpc-middleware/logging"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"google.golang.org/grpc"
Expand Down Expand Up @@ -496,46 +495,33 @@ func (c *client) NodeGetInfo(ctx context.Context) (*NodeGetInfoResponse, error)
return result, nil
}

func (c *client) NodeStageVolume(ctx context.Context, volumeID string, publishContext map[string]string, stagingTargetPath string, capabilities *VolumeCapability, secrets structs.CSISecrets, opts ...grpc.CallOption) error {
func (c *client) NodeStageVolume(ctx context.Context, req *NodeStageVolumeRequest, opts ...grpc.CallOption) error {
if c == nil {
return fmt.Errorf("Client not initialized")
}
if c.nodeClient == nil {
return fmt.Errorf("Client not initialized")
}

// These errors should not be returned during production use but exist as aids
// during Nomad development
if volumeID == "" {
return fmt.Errorf("missing volumeID")
}
if stagingTargetPath == "" {
return fmt.Errorf("missing stagingTargetPath")
}

req := &csipbv1.NodeStageVolumeRequest{
VolumeId: volumeID,
PublishContext: publishContext,
StagingTargetPath: stagingTargetPath,
VolumeCapability: capabilities.ToCSIRepresentation(),
Secrets: secrets,
err := req.Validate()
if err != nil {
return err
}

// NodeStageVolume's response contains no extra data. If err == nil, we were
// successful.
_, err := c.nodeClient.NodeStageVolume(ctx, req, opts...)
_, err = c.nodeClient.NodeStageVolume(ctx, req.ToCSIRepresentation(), opts...)
if err != nil {
code := status.Code(err)
switch code {
case codes.NotFound:
err = fmt.Errorf("volume %q could not be found: %v", volumeID, err)
err = fmt.Errorf("volume %q could not be found: %v", req.ExternalID, err)
case codes.AlreadyExists:
err = fmt.Errorf(
"volume %q is already staged to %q but with incompatible capabilities for this request: %v",
volumeID, stagingTargetPath, err)
req.ExternalID, req.StagingTargetPath, err)
case codes.FailedPrecondition:
err = fmt.Errorf("volume %q is already published on another node and does not have MULTI_NODE volume capability: %v",
volumeID, err)
req.ExternalID, err)
case codes.Internal:
err = fmt.Errorf("node plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
}
Expand Down
7 changes: 5 additions & 2 deletions plugins/csi/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,8 +628,11 @@ func TestClient_RPC_NodeStageVolume(t *testing.T) {
nc.NextErr = tc.ResponseErr
nc.NextStageVolumeResponse = tc.Response

err := client.NodeStageVolume(context.TODO(), "foo", nil, "/path",
&VolumeCapability{}, structs.CSISecrets{})
err := client.NodeStageVolume(context.TODO(), &NodeStageVolumeRequest{
ExternalID: "foo",
StagingTargetPath: "/path",
VolumeCapability: &VolumeCapability{},
})
if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())
} else {
Expand Down
3 changes: 1 addition & 2 deletions plugins/csi/fake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"sync"

"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/csi"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
Expand Down Expand Up @@ -192,7 +191,7 @@ func (c *Client) NodeGetInfo(ctx context.Context) (*csi.NodeGetInfoResponse, err
// NodeStageVolume is used when a plugin has the STAGE_UNSTAGE volume capability
// to prepare a volume for usage on a host. If err == nil, the response should
// be assumed to be successful.
func (c *Client) NodeStageVolume(ctx context.Context, volumeID string, publishContext map[string]string, stagingTargetPath string, capabilities *csi.VolumeCapability, secrets structs.CSISecrets, opts ...grpc.CallOption) error {
func (c *Client) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest, opts ...grpc.CallOption) error {
c.Mu.Lock()
defer c.Mu.Unlock()

Expand Down
71 changes: 70 additions & 1 deletion plugins/csi/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type CSIPlugin interface {
// NodeStageVolume is used when a plugin has the STAGE_UNSTAGE volume capability
// to prepare a volume for usage on a host. If err == nil, the response should
// be assumed to be successful.
NodeStageVolume(ctx context.Context, volumeID string, publishContext map[string]string, stagingTargetPath string, capabilities *VolumeCapability, secrets structs.CSISecrets, opts ...grpc.CallOption) error
NodeStageVolume(ctx context.Context, req *NodeStageVolumeRequest, opts ...grpc.CallOption) error

// NodeUnstageVolume is used when a plugin has the STAGE_UNSTAGE volume capability
// to undo the work performed by NodeStageVolume. If a volume has been staged,
Expand Down Expand Up @@ -114,6 +114,11 @@ type NodePublishVolumeRequest struct {
// Secrets required by plugins to complete the node publish volume
// request. This field is OPTIONAL.
Secrets structs.CSISecrets

// Volume context as returned by SP in the CSI
// CreateVolumeResponse.Volume.volume_context which we don't implement but
// can be entered by hand in the volume spec. This field is OPTIONAL.
VolumeContext map[string]string
}

func (r *NodePublishVolumeRequest) ToCSIRepresentation() *csipbv1.NodePublishVolumeRequest {
Expand All @@ -129,6 +134,7 @@ func (r *NodePublishVolumeRequest) ToCSIRepresentation() *csipbv1.NodePublishVol
VolumeCapability: r.VolumeCapability.ToCSIRepresentation(),
Readonly: r.Readonly,
Secrets: r.Secrets,
VolumeContext: r.VolumeContext,
}
}

Expand All @@ -148,6 +154,69 @@ func (r *NodePublishVolumeRequest) Validate() error {
return nil
}

type NodeStageVolumeRequest struct {
// The external ID of the volume to stage.
ExternalID string

// If the volume was attached via a call to `ControllerPublishVolume` then
// we need to provide the returned PublishContext here.
PublishContext map[string]string

// The path to which the volume MAY be staged. It MUST be an
// absolute path in the root filesystem of the process serving this
// request, and MUST be a directory. The CO SHALL ensure that there
// is only one `staging_target_path` per volume. The CO SHALL ensure
// that the path is directory and that the process serving the
// request has `read` and `write` permission to that directory. The
// CO SHALL be responsible for creating the directory if it does not
// exist.
// This is a REQUIRED field.
StagingTargetPath string

// Volume capability describing how the CO intends to use this volume.
VolumeCapability *VolumeCapability

// Secrets required by plugins to complete the node stage volume
// request. This field is OPTIONAL.
Secrets structs.CSISecrets

// Volume context as returned by SP in the CSI
// CreateVolumeResponse.Volume.volume_context which we don't implement but
// can be entered by hand in the volume spec. This field is OPTIONAL.
VolumeContext map[string]string
}

func (r *NodeStageVolumeRequest) ToCSIRepresentation() *csipbv1.NodeStageVolumeRequest {
if r == nil {
return nil
}

return &csipbv1.NodeStageVolumeRequest{
VolumeId: r.ExternalID,
PublishContext: r.PublishContext,
StagingTargetPath: r.StagingTargetPath,
VolumeCapability: r.VolumeCapability.ToCSIRepresentation(),
Secrets: r.Secrets,
VolumeContext: r.VolumeContext,
}
}

func (r *NodeStageVolumeRequest) Validate() error {
if r.ExternalID == "" {
return errors.New("missing volume ID")
}

if r.StagingTargetPath == "" {
return errors.New("missing StagingTargetPath")
}

if r.VolumeCapability == nil {
return errors.New("missing VolumeCapabilities")
}

return nil
}

type PluginCapabilitySet struct {
hasControllerService bool
hasTopologies bool
Expand Down

0 comments on commit 901664f

Please sign in to comment.