Skip to content

Commit

Permalink
Retry provisioning of volumes after transient error
Browse files Browse the repository at this point in the history
The provisioner should retry CreateVolume call after a transient error
(such as timeout), because the CSI driver may be creating a volume in the
background.

Therefore ProvisionerExt interface need to be implemented. ProvisionExt()
returns:

- Finished, if it can be 100% sure that the driver is not creating a volume
- NoChange, if something (temporarily?) failed before reaching the CSI
  driver, for example when Kubernetes API server is not reachable.
- InBackground, if error returned by the driver (or gRPC) is transient.
  • Loading branch information
jsafrane committed Jul 8, 2019
1 parent 27750ab commit d7d27c3
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 27 deletions.
79 changes: 58 additions & 21 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"strings"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/external-provisioner/pkg/features"
Expand Down Expand Up @@ -179,6 +182,7 @@ type csiProvisioner struct {

var _ controller.Provisioner = &csiProvisioner{}
var _ controller.BlockProvisioner = &csiProvisioner{}
var _ controller.ProvisionerExt = &csiProvisioner{}

var (
// Each provisioner have a identify string to distinguish with others. This
Expand Down Expand Up @@ -353,8 +357,14 @@ func getVolumeCapability(
}

func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.PersistentVolume, error) {
// The controller should call ProvisionExt() instead, but just in case...
pv, _, err := p.ProvisionExt(options)
return pv, err
}

func (p *csiProvisioner) ProvisionExt(options controller.ProvisionOptions) (*v1.PersistentVolume, controller.ProvisioningState, error) {
if options.StorageClass == nil {
return nil, errors.New("storage class was nil")
return nil, controller.ProvisioningFinished, errors.New("storage class was nil")
}

migratedVolume := false
Expand All @@ -367,7 +377,7 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per
klog.V(2).Infof("translating storage class for in-tree plugin %s to CSI", options.StorageClass.Provisioner)
storageClass, err := csitranslationlib.TranslateInTreeStorageClassToCSI(p.supportsMigrationFromInTreePluginName, options.StorageClass)
if err != nil {
return nil, fmt.Errorf("failed to translate storage class: %v", err)
return nil, controller.ProvisioningFinished, fmt.Errorf("failed to translate storage class: %v", err)
}
options.StorageClass = storageClass
migratedVolume = true
Expand All @@ -381,13 +391,13 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per
if options.PVC.Spec.DataSource != nil {
// PVC.Spec.DataSource.Name is the name of the VolumeSnapshot API object
if options.PVC.Spec.DataSource.Name == "" {
return nil, fmt.Errorf("the PVC source not found for PVC %s", options.PVC.Name)
return nil, controller.ProvisioningFinished, fmt.Errorf("the PVC source not found for PVC %s", options.PVC.Name)
}

switch options.PVC.Spec.DataSource.Kind {
case snapshotKind:
if *(options.PVC.Spec.DataSource.APIGroup) != snapshotAPIGroup {
return nil, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup))
return nil, controller.ProvisioningFinished, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup))
}
rc.snapshot = true
case pvcKind:
Expand All @@ -397,16 +407,16 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per
}
}
if err := p.checkDriverCapabilities(rc); err != nil {
return nil, err
return nil, controller.ProvisioningFinished, err
}

if options.PVC.Spec.Selector != nil {
return nil, fmt.Errorf("claim Selector is not supported")
return nil, controller.ProvisioningFinished, fmt.Errorf("claim Selector is not supported")
}

pvName, err := makeVolumeName(p.volumeNamePrefix, fmt.Sprintf("%s", options.PVC.ObjectMeta.UID), p.volumeNameUUIDLength)
if err != nil {
return nil, err
return nil, controller.ProvisioningFinished, err
}

fsTypesFound := 0
Expand All @@ -421,7 +431,7 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per
}
}
if fsTypesFound > 1 {
return nil, fmt.Errorf("fstype specified in parameters with both \"fstype\" and \"%s\" keys", prefixedFsTypeKey)
return nil, controller.ProvisioningFinished, fmt.Errorf("fstype specified in parameters with both \"fstype\" and \"%s\" keys", prefixedFsTypeKey)
}
if len(fsType) == 0 {
fsType = defaultFSType
Expand Down Expand Up @@ -449,7 +459,7 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per
if options.PVC.Spec.DataSource != nil && (rc.clone || rc.snapshot) {
volumeContentSource, err := p.getVolumeContentSource(options)
if err != nil {
return nil, fmt.Errorf("error getting handle for DataSource Type %s by Name %s: %v", options.PVC.Spec.DataSource.Kind, options.PVC.Spec.DataSource.Name, err)
return nil, controller.ProvisioningNoChange, fmt.Errorf("error getting handle for DataSource Type %s by Name %s: %v", options.PVC.Spec.DataSource.Kind, options.PVC.Spec.DataSource.Name, err)
}
req.VolumeContentSource = volumeContentSource
}
Expand All @@ -463,7 +473,7 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per
options.SelectedNode,
p.strictTopology)
if err != nil {
return nil, fmt.Errorf("error generating accessibility requirements: %v", err)
return nil, controller.ProvisioningNoChange, fmt.Errorf("error generating accessibility requirements: %v", err)
}
req.AccessibilityRequirements = requirements
}
Expand All @@ -480,43 +490,46 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per
},
})
if err != nil {
return nil, err
return nil, controller.ProvisioningNoChange, err
}
provisionerCredentials, err := getCredentials(p.client, provisionerSecretRef)
if err != nil {
return nil, err
return nil, controller.ProvisioningNoChange, err
}
req.Secrets = provisionerCredentials

// Resolve controller publish, node stage, node publish secret references
controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretParams, options.StorageClass.Parameters, pvName, options.PVC)
if err != nil {
return nil, err
return nil, controller.ProvisioningNoChange, err
}
nodeStageSecretRef, err := getSecretReference(nodeStageSecretParams, options.StorageClass.Parameters, pvName, options.PVC)
if err != nil {
return nil, err
return nil, controller.ProvisioningNoChange, err
}
nodePublishSecretRef, err := getSecretReference(nodePublishSecretParams, options.StorageClass.Parameters, pvName, options.PVC)
if err != nil {
return nil, err
return nil, controller.ProvisioningNoChange, err
}
controllerExpandSecretRef, err := getSecretReference(controllerExpandSecretParams, options.StorageClass.Parameters, pvName, options.PVC)
if err != nil {
return nil, err
return nil, controller.ProvisioningNoChange, err
}

req.Parameters, err = removePrefixedParameters(options.StorageClass.Parameters)
if err != nil {
return nil, fmt.Errorf("failed to strip CSI Parameters of prefixed keys: %v", err)
return nil, controller.ProvisioningFinished, fmt.Errorf("failed to strip CSI Parameters of prefixed keys: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()
rep, err = p.csiClient.CreateVolume(ctx, &req)

if err != nil {
return nil, err
if isFinalError(err) {
return nil, controller.ProvisioningFinished, err
}
return nil, controller.ProvisioningInBackground, err
}

if rep.Volume != nil {
Expand All @@ -539,7 +552,8 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per
if err != nil {
capErr = fmt.Errorf("%v. Cleanup of volume %s failed, volume is orphaned: %v", capErr, pvName, err)
}
return nil, capErr
// use InBackground to retry the call, hoping the volume is deleted correctly next time.
return nil, controller.ProvisioningInBackground, capErr
}

pv := &v1.PersistentVolume{
Expand Down Expand Up @@ -589,13 +603,13 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per
if err != nil {
klog.Warningf("failed to translate CSI PV to in-tree due to: %v. Deleting provisioned PV", err)
p.Delete(pv)
return nil, err
return nil, controller.ProvisioningFinished, err
}
}

klog.Infof("successfully created PV %+v", pv.Spec.PersistentVolumeSource)

return pv, nil
return pv, controller.ProvisioningFinished, nil
}

func (p *csiProvisioner) supportsTopology() bool {
Expand Down Expand Up @@ -1004,3 +1018,26 @@ func deprecationWarning(deprecatedParam, newParam, removalVersion string) string
}
return fmt.Sprintf("\"%s\" is deprecated and will be removed in %s%s", deprecatedParam, removalVersion, newParamPhrase)
}

func isFinalError(err error) bool {
// Sources:
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
// https://github.com/container-storage-interface/spec/blob/master/spec.md
st, ok := status.FromError(err)
if !ok {
// This is not gRPC error. The operation must have failed before gRPC
// method was called, otherwise we would get gRPC error.
// We don't know if any previous CreateVolume is in progress, be on the safe side.
return false
}
switch st.Code() {
case codes.Canceled, // gRPC: Client Application cancelled the request
codes.DeadlineExceeded, // gRPC: Timeout
codes.Unavailable, // gRPC: Server shutting down, TCP connection broken - previous CreateVolume() may be still in progress.
codes.ResourceExhausted: // gRPC: Server temporarily out of resources - previous Provision() may be still in progress.
return false
}
// All other errors mean that provisioning either did not
// even start or failed. It is for sure not in progress.
return true
}
Loading

0 comments on commit d7d27c3

Please sign in to comment.