Skip to content

Commit

Permalink
csi: add grpc retries to client controller RPCs (#7549)
Browse files Browse the repository at this point in the history
The CSI Specification defines various gRPC Errors and how they may be retried. After auditing all our CSI RPC calls in #6863, this changeset:

* adds retries and backoffs to the where they were needed but not implemented
* annotates those CSI RPCs that do not need retries so that we don't wonder whether it's been left off accidentally
* added a timeout and cancellation context to the `Probe` call, which didn't have one.
  • Loading branch information
tgross authored Mar 30, 2020
1 parent a86e575 commit ffa13ad
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 23 deletions.
23 changes: 20 additions & 3 deletions client/csi_controller_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

metrics "github.com/armon/go-metrics"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/plugins/csi"
Expand Down Expand Up @@ -55,7 +56,13 @@ func (c *CSIController) ValidateVolume(req *structs.ClientCSIControllerValidateV

ctx, cancelFn := c.requestContext()
defer cancelFn()
return plugin.ControllerValidateCapabilties(ctx, req.VolumeID, caps)

// CSI ValidateVolumeCapabilities errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
return plugin.ControllerValidateCapabilities(ctx, req.VolumeID, caps,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
}

// AttachVolume is used to attach a volume from a CSI Cluster to
Expand Down Expand Up @@ -95,7 +102,12 @@ func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolum
// Submit the request for a volume to the CSI Plugin.
ctx, cancelFn := c.requestContext()
defer cancelFn()
cresp, err := plugin.ControllerPublishVolume(ctx, csiReq)
// CSI ControllerPublishVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
cresp, err := plugin.ControllerPublishVolume(ctx, csiReq,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if err != nil {
return err
}
Expand Down Expand Up @@ -132,7 +144,12 @@ func (c *CSIController) DetachVolume(req *structs.ClientCSIControllerDetachVolum
// Submit the request for a volume to the CSI Plugin.
ctx, cancelFn := c.requestContext()
defer cancelFn()
_, err = plugin.ControllerUnpublishVolume(ctx, csiReq)
// CSI ControllerUnpublishVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
_, err = plugin.ControllerUnpublishVolume(ctx, csiReq,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if err != nil {
return err
}
Expand Down
14 changes: 9 additions & 5 deletions client/pluginmanager/csimanager/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,8 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume,
return err
}

// We currently treat all explicit CSI NodeStageVolume errors (aside from timeouts, codes.ResourceExhausted, and codes.Unavailable)
// as fatal.
// In the future, we can provide more useful error messages based on
// different types of error. For error documentation see:
// https://github.com/container-storage-interface/spec/blob/4731db0e0bc53238b93850f43ab05d9355df0fd9/spec.md#nodestagevolume-errors
// CSI NodeStageVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
return v.plugin.NodeStageVolume(ctx,
vol.ID,
publishContext,
Expand Down Expand Up @@ -199,6 +196,8 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum
return nil, err
}

// CSI NodePublishVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
err = v.plugin.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
VolumeID: vol.RemoteID(),
PublishContext: publishContext,
Expand Down Expand Up @@ -249,6 +248,9 @@ func (v *volumeManager) unstageVolume(ctx context.Context, vol *structs.CSIVolum
logger := hclog.FromContext(ctx)
logger.Trace("Unstaging volume")
stagingPath := v.stagingDirForVolume(v.containerMountPoint, vol, usage)

// CSI NodeUnstageVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
return v.plugin.NodeUnstageVolume(ctx,
vol.ID,
stagingPath,
Expand All @@ -274,6 +276,8 @@ func combineErrors(maybeErrs ...error) error {
func (v *volumeManager) unpublishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) error {
pluginTargetPath := v.allocDirForVolume(v.containerMountPoint, vol, alloc, usage)

// CSI NodeUnpublishVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
rpcErr := v.plugin.NodeUnpublishVolume(ctx, vol.ID, pluginTargetPath,
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
grpc_retry.WithMax(3),
Expand Down
33 changes: 24 additions & 9 deletions plugins/csi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ func newGrpcConn(addr string, logger hclog.Logger) (*grpc.ClientConn, error) {
// PluginInfo describes the type and version of a plugin as required by the nomad
// base.BasePlugin interface.
func (c *client) PluginInfo() (*base.PluginInfoResponse, error) {
name, version, err := c.PluginGetInfo(context.TODO())
// note: no grpc retries needed here, as this is called in
// fingerprinting and will get retried by the caller.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
name, version, err := c.PluginGetInfo(ctx)
if err != nil {
return nil, err
}
Expand All @@ -155,6 +159,7 @@ func (c *client) SetConfig(_ *base.Config) error {
}

func (c *client) PluginProbe(ctx context.Context) (bool, error) {
// note: no grpc retries should be done here
req, err := c.identityClient.Probe(ctx, &csipbv1.ProbeRequest{})
if err != nil {
return false, err
Expand Down Expand Up @@ -205,7 +210,10 @@ func (c *client) PluginGetCapabilities(ctx context.Context) (*PluginCapabilitySe
return nil, fmt.Errorf("Client not initialized")
}

resp, err := c.identityClient.GetPluginCapabilities(ctx, &csipbv1.GetPluginCapabilitiesRequest{})
// note: no grpc retries needed here, as this is called in
// fingerprinting and will get retried by the caller
resp, err := c.identityClient.GetPluginCapabilities(ctx,
&csipbv1.GetPluginCapabilitiesRequest{})
if err != nil {
return nil, err
}
Expand All @@ -225,15 +233,18 @@ func (c *client) ControllerGetCapabilities(ctx context.Context) (*ControllerCapa
return nil, fmt.Errorf("controllerClient not initialized")
}

resp, err := c.controllerClient.ControllerGetCapabilities(ctx, &csipbv1.ControllerGetCapabilitiesRequest{})
// note: no grpc retries needed here, as this is called in
// fingerprinting and will get retried by the caller
resp, err := c.controllerClient.ControllerGetCapabilities(ctx,
&csipbv1.ControllerGetCapabilitiesRequest{})
if err != nil {
return nil, err
}

return NewControllerCapabilitySet(resp), nil
}

func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPublishVolumeRequest) (*ControllerPublishVolumeResponse, error) {
func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*ControllerPublishVolumeResponse, error) {
if c == nil {
return nil, fmt.Errorf("Client not initialized")
}
Expand All @@ -247,7 +258,7 @@ func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPub
}

pbrequest := req.ToCSIRepresentation()
resp, err := c.controllerClient.ControllerPublishVolume(ctx, pbrequest)
resp, err := c.controllerClient.ControllerPublishVolume(ctx, pbrequest, opts...)
if err != nil {
return nil, err
}
Expand All @@ -257,7 +268,7 @@ func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPub
}, nil
}

func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest) (*ControllerUnpublishVolumeResponse, error) {
func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*ControllerUnpublishVolumeResponse, error) {
if c == nil {
return nil, fmt.Errorf("Client not initialized")
}
Expand All @@ -270,15 +281,15 @@ func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerU
}

upbrequest := req.ToCSIRepresentation()
_, err = c.controllerClient.ControllerUnpublishVolume(ctx, upbrequest)
_, err = c.controllerClient.ControllerUnpublishVolume(ctx, upbrequest, opts...)
if err != nil {
return nil, err
}

return &ControllerUnpublishVolumeResponse{}, nil
}

func (c *client) ControllerValidateCapabilties(ctx context.Context, volumeID string, capabilities *VolumeCapability) error {
func (c *client) ControllerValidateCapabilities(ctx context.Context, volumeID string, capabilities *VolumeCapability, opts ...grpc.CallOption) error {
if c == nil {
return fmt.Errorf("Client not initialized")
}
Expand All @@ -301,7 +312,7 @@ func (c *client) ControllerValidateCapabilties(ctx context.Context, volumeID str
},
}

resp, err := c.controllerClient.ValidateVolumeCapabilities(ctx, req)
resp, err := c.controllerClient.ValidateVolumeCapabilities(ctx, req, opts...)
if err != nil {
return err
}
Expand Down Expand Up @@ -329,6 +340,8 @@ func (c *client) NodeGetCapabilities(ctx context.Context) (*NodeCapabilitySet, e
return nil, fmt.Errorf("Client not initialized")
}

// note: no grpc retries needed here, as this is called in
// fingerprinting and will get retried by the caller
resp, err := c.nodeClient.NodeGetCapabilities(ctx, &csipbv1.NodeGetCapabilitiesRequest{})
if err != nil {
return nil, err
Expand All @@ -347,6 +360,8 @@ func (c *client) NodeGetInfo(ctx context.Context) (*NodeGetInfoResponse, error)

result := &NodeGetInfoResponse{}

// note: no grpc retries needed here, as this is called in
// fingerprinting and will get retried by the caller
resp, err := c.nodeClient.NodeGetInfo(ctx, &csipbv1.NodeGetInfoRequest{})
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions plugins/csi/fake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (c *Client) ControllerGetCapabilities(ctx context.Context) (*csi.Controller
}

// ControllerPublishVolume is used to attach a remote volume to a node
func (c *Client) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
func (c *Client) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*csi.ControllerPublishVolumeResponse, error) {
c.Mu.Lock()
defer c.Mu.Unlock()

Expand All @@ -150,7 +150,7 @@ func (c *Client) ControllerPublishVolume(ctx context.Context, req *csi.Controlle
}

// ControllerUnpublishVolume is used to attach a remote volume to a node
func (c *Client) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
func (c *Client) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*csi.ControllerUnpublishVolumeResponse, error) {
c.Mu.Lock()
defer c.Mu.Unlock()

Expand All @@ -159,7 +159,7 @@ func (c *Client) ControllerUnpublishVolume(ctx context.Context, req *csi.Control
return c.NextControllerUnpublishVolumeResponse, c.NextControllerUnpublishVolumeErr
}

func (c *Client) ControllerValidateCapabilties(ctx context.Context, volumeID string, capabilities *csi.VolumeCapability) error {
func (c *Client) ControllerValidateCapabilities(ctx context.Context, volumeID string, capabilities *csi.VolumeCapability, opts ...grpc.CallOption) error {
c.Mu.Lock()
defer c.Mu.Unlock()

Expand Down
6 changes: 3 additions & 3 deletions plugins/csi/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ type CSIPlugin interface {
ControllerGetCapabilities(ctx context.Context) (*ControllerCapabilitySet, error)

// ControllerPublishVolume is used to attach a remote volume to a cluster node.
ControllerPublishVolume(ctx context.Context, req *ControllerPublishVolumeRequest) (*ControllerPublishVolumeResponse, error)
ControllerPublishVolume(ctx context.Context, req *ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*ControllerPublishVolumeResponse, error)

// ControllerUnpublishVolume is used to deattach a remote volume from a cluster node.
ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest) (*ControllerUnpublishVolumeResponse, error)
ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*ControllerUnpublishVolumeResponse, error)

// ControllerValidateCapabilities is used to validate that a volume exists and
// supports the requested capability.
ControllerValidateCapabilties(ctx context.Context, volumeID string, capabilities *VolumeCapability) error
ControllerValidateCapabilities(ctx context.Context, volumeID string, capabilities *VolumeCapability, opts ...grpc.CallOption) error

// NodeGetCapabilities is used to return the available capabilities from the
// Node Service.
Expand Down

0 comments on commit ffa13ad

Please sign in to comment.