diff --git a/Gopkg.lock b/Gopkg.lock index 1ffd2accf7..4c81e1ba82 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -836,6 +836,8 @@ "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/fake", "github.com/spf13/pflag", "google.golang.org/grpc", + "google.golang.org/grpc/codes", + "google.golang.org/grpc/status", "k8s.io/api/core/v1", "k8s.io/api/storage/v1", "k8s.io/api/storage/v1beta1", diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 22f6c41d3b..5d186252ba 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -25,6 +25,9 @@ import ( "strings" "time" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-csi/csi-lib-utils/connection" "github.com/kubernetes-csi/external-provisioner/pkg/features" @@ -166,6 +169,7 @@ type csiProvisioner struct { var _ controller.Provisioner = &csiProvisioner{} var _ controller.BlockProvisioner = &csiProvisioner{} +var _ controller.ProvisionerExt = &csiProvisioner{} var ( // Each provisioner have a identify string to distinguish with others. This @@ -333,8 +337,14 @@ func getVolumeCapability( } func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.PersistentVolume, error) { + // The controller should call ProvisionExt() instead, but just in case... + pv, _, err := p.ProvisionExt(options) + return pv, err +} + +func (p *csiProvisioner) ProvisionExt(options controller.ProvisionOptions) (*v1.PersistentVolume, controller.ProvisioningState, error) { if options.StorageClass == nil { - return nil, errors.New("storage class was nil") + return nil, controller.ProvisioningFinished, errors.New("storage class was nil") } migratedVolume := false @@ -347,7 +357,7 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per klog.V(2).Infof("translating storage class for in-tree plugin %s to CSI", options.StorageClass.Provisioner) storageClass, err := csitranslationlib.TranslateInTreeStorageClassToCSI(p.supportsMigrationFromInTreePluginName, options.StorageClass) if err != nil { - return nil, fmt.Errorf("failed to translate storage class: %v", err) + return nil, controller.ProvisioningFinished, fmt.Errorf("failed to translate storage class: %v", err) } options.StorageClass = storageClass migratedVolume = true @@ -360,28 +370,28 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per if options.PVC.Spec.DataSource != nil { // PVC.Spec.DataSource.Name is the name of the VolumeSnapshot API object if options.PVC.Spec.DataSource.Name == "" { - return nil, fmt.Errorf("the PVC source not found for PVC %s", options.PVC.Name) + return nil, controller.ProvisioningFinished, fmt.Errorf("the PVC source not found for PVC %s", options.PVC.Name) } if options.PVC.Spec.DataSource.Kind != snapshotKind { - return nil, fmt.Errorf("the PVC source is not the right type. Expected %s, Got %s", snapshotKind, options.PVC.Spec.DataSource.Kind) + return nil, controller.ProvisioningFinished, fmt.Errorf("the PVC source is not the right type. Expected %s, Got %s", snapshotKind, options.PVC.Spec.DataSource.Kind) } if *(options.PVC.Spec.DataSource.APIGroup) != snapshotAPIGroup { - return nil, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup)) + return nil, controller.ProvisioningFinished, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup)) } needSnapshotSupport = true } if err := p.checkDriverCapabilities(needSnapshotSupport); err != nil { - return nil, err + return nil, controller.ProvisioningFinished, err } if options.PVC.Spec.Selector != nil { - return nil, fmt.Errorf("claim Selector is not supported") + return nil, controller.ProvisioningFinished, fmt.Errorf("claim Selector is not supported") } pvName, err := makeVolumeName(p.volumeNamePrefix, fmt.Sprintf("%s", options.PVC.ObjectMeta.UID), p.volumeNameUUIDLength) if err != nil { - return nil, err + return nil, controller.ProvisioningFinished, err } fsTypesFound := 0 @@ -396,7 +406,7 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per } } if fsTypesFound > 1 { - return nil, fmt.Errorf("fstype specified in parameters with both \"fstype\" and \"%s\" keys", prefixedFsTypeKey) + return nil, controller.ProvisioningFinished, fmt.Errorf("fstype specified in parameters with both \"fstype\" and \"%s\" keys", prefixedFsTypeKey) } if len(fsType) == 0 { fsType = defaultFSType @@ -424,7 +434,7 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per if needSnapshotSupport { volumeContentSource, err := p.getVolumeContentSource(options) if err != nil { - return nil, fmt.Errorf("error getting snapshot handle for snapshot %s: %v", options.PVC.Spec.DataSource.Name, err) + return nil, controller.ProvisioningNoChange, fmt.Errorf("error getting snapshot handle for snapshot %s: %v", options.PVC.Spec.DataSource.Name, err) } req.VolumeContentSource = volumeContentSource } @@ -438,7 +448,7 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per options.SelectedNode, p.strictTopology) if err != nil { - return nil, fmt.Errorf("error generating accessibility requirements: %v", err) + return nil, controller.ProvisioningNoChange, fmt.Errorf("error generating accessibility requirements: %v", err) } req.AccessibilityRequirements = requirements } @@ -455,31 +465,31 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per }, }) if err != nil { - return nil, err + return nil, controller.ProvisioningNoChange, err } provisionerCredentials, err := getCredentials(p.client, provisionerSecretRef) if err != nil { - return nil, err + return nil, controller.ProvisioningNoChange, err } req.Secrets = provisionerCredentials // Resolve controller publish, node stage, node publish secret references controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretParams, options.StorageClass.Parameters, pvName, options.PVC) if err != nil { - return nil, err + return nil, controller.ProvisioningNoChange, err } nodeStageSecretRef, err := getSecretReference(nodeStageSecretParams, options.StorageClass.Parameters, pvName, options.PVC) if err != nil { - return nil, err + return nil, controller.ProvisioningNoChange, err } nodePublishSecretRef, err := getSecretReference(nodePublishSecretParams, options.StorageClass.Parameters, pvName, options.PVC) if err != nil { - return nil, err + return nil, controller.ProvisioningNoChange, err } req.Parameters, err = removePrefixedParameters(options.StorageClass.Parameters) if err != nil { - return nil, fmt.Errorf("failed to strip CSI Parameters of prefixed keys: %v", err) + return nil, controller.ProvisioningFinished, fmt.Errorf("failed to strip CSI Parameters of prefixed keys: %v", err) } ctx, cancel := context.WithTimeout(context.Background(), p.timeout) @@ -487,7 +497,10 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per rep, err = p.csiClient.CreateVolume(ctx, &req) if err != nil { - return nil, err + if isFinalError(err) { + return nil, controller.ProvisioningFinished, err + } + return nil, controller.ProvisioningInBackground, err } if rep.Volume != nil { @@ -510,7 +523,8 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per if err != nil { capErr = fmt.Errorf("%v. Cleanup of volume %s failed, volume is orphaned: %v", capErr, pvName, err) } - return nil, capErr + // use InBackground to retry the call, hoping the volume is deleted correctly next time. + return nil, controller.ProvisioningInBackground, capErr } pv := &v1.PersistentVolume{ @@ -558,14 +572,19 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per pv, err = csitranslationlib.TranslateCSIPVToInTree(pv) if err != nil { klog.Warningf("failed to translate CSI PV to in-tree due to: %v. Deleting provisioned PV", err) - p.Delete(pv) - return nil, err + deleteErr := p.Delete(pv) + if deleteErr != nil { + klog.Warningf("failed to delete partly provisioned PV: %v", deleteErr) + // Retry the call again to clean up the orphan + return nil, controller.ProvisioningInBackground, err + } + return nil, controller.ProvisioningFinished, err } } klog.Infof("successfully created PV %+v", pv.Spec.PersistentVolumeSource) - return pv, nil + return pv, controller.ProvisioningFinished, nil } func (p *csiProvisioner) supportsTopology() bool { @@ -919,3 +938,27 @@ func deprecationWarning(deprecatedParam, newParam, removalVersion string) string } return fmt.Sprintf("\"%s\" is deprecated and will be removed in %s%s", deprecatedParam, removalVersion, newParamPhrase) } + +func isFinalError(err error) bool { + // Sources: + // https://github.com/grpc/grpc/blob/master/doc/statuscodes.md + // https://github.com/container-storage-interface/spec/blob/master/spec.md + st, ok := status.FromError(err) + if !ok { + // This is not gRPC error. The operation must have failed before gRPC + // method was called, otherwise we would get gRPC error. + // We don't know if any previous CreateVolume is in progress, be on the safe side. + return false + } + switch st.Code() { + case codes.Canceled, // gRPC: Client Application cancelled the request + codes.DeadlineExceeded, // gRPC: Timeout + codes.Unavailable, // gRPC: Server shutting down, TCP connection broken - previous CreateVolume() may be still in progress. + codes.ResourceExhausted, // gRPC: Server temporarily out of resources - previous CreateVolume() may be still in progress. + codes.Aborted: // CSI: Operation pending for volume + return false + } + // All other errors mean that provisioning either did not + // even start or failed. It is for sure not in progress. + return true +} diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 83197a04de..c50bc6da5a 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -27,6 +27,9 @@ import ( "testing" "time" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/mock/gomock" "github.com/kubernetes-csi/csi-lib-utils/connection" @@ -723,7 +726,9 @@ type provisioningTestcase struct { volWithLessCap bool expectedPVSpec *pvSpec withSecretRefs bool + createVolumeError error expectErr bool + expectState controller.ProvisioningState expectCreateVolDo interface{} } @@ -766,6 +771,7 @@ func TestProvision(t *testing.T) { }, }, }, + expectState: controller.ProvisioningFinished, }, "multiple fsType provision": { volOpts: controller.ProvisionOptions{ @@ -779,7 +785,8 @@ func TestProvision(t *testing.T) { PVName: "test-name", PVC: createFakePVC(requestedBytes), }, - expectErr: true, + expectErr: true, + expectState: controller.ProvisioningFinished, }, "provision with prefixed FS Type key": { volOpts: controller.ProvisionOptions{ @@ -812,6 +819,7 @@ func TestProvision(t *testing.T) { t.Errorf("Parameters should have been stripped") } }, + expectState: controller.ProvisioningFinished, }, "provision with access mode multi node multi writer": { volOpts: controller.ProvisionOptions{ @@ -862,6 +870,7 @@ func TestProvision(t *testing.T) { t.Errorf("Expected multi_node_multi_writer") } }, + expectState: controller.ProvisioningFinished, }, "provision with access mode multi node multi readonly": { volOpts: controller.ProvisionOptions{ @@ -912,6 +921,7 @@ func TestProvision(t *testing.T) { t.Errorf("Expected multi_node_reader_only") } }, + expectState: controller.ProvisioningFinished, }, "provision with access mode single writer": { volOpts: controller.ProvisionOptions{ @@ -962,6 +972,7 @@ func TestProvision(t *testing.T) { t.Errorf("Expected single_node_writer") } }, + expectState: controller.ProvisioningFinished, }, "provision with multiple access modes": { volOpts: controller.ProvisionOptions{ @@ -1018,6 +1029,7 @@ func TestProvision(t *testing.T) { t.Errorf("Expected single_node_writer") } }, + expectState: controller.ProvisioningFinished, }, "provision with secrets": { volOpts: controller.ProvisionOptions{ @@ -1056,6 +1068,7 @@ func TestProvision(t *testing.T) { }, }, }, + expectState: controller.ProvisioningFinished, }, "provision with volume mode(Filesystem)": { volOpts: controller.ProvisionOptions{ @@ -1082,6 +1095,7 @@ func TestProvision(t *testing.T) { }, }, }, + expectState: controller.ProvisioningFinished, }, "provision with volume mode(Block)": { volOpts: controller.ProvisionOptions{ @@ -1107,6 +1121,7 @@ func TestProvision(t *testing.T) { }, }, }, + expectState: controller.ProvisioningFinished, }, "fail to get secret reference": { volOpts: controller.ProvisionOptions{ @@ -1118,6 +1133,7 @@ func TestProvision(t *testing.T) { }, getSecretRefErr: true, expectErr: true, + expectState: controller.ProvisioningNoChange, }, "fail not nil selector": { volOpts: controller.ProvisionOptions{ @@ -1126,6 +1142,7 @@ func TestProvision(t *testing.T) { }, notNilSelector: true, expectErr: true, + expectState: controller.ProvisioningFinished, }, "fail to make volume name": { volOpts: controller.ProvisionOptions{ @@ -1134,6 +1151,7 @@ func TestProvision(t *testing.T) { }, makeVolumeNameErr: true, expectErr: true, + expectState: controller.ProvisioningFinished, }, "fail to get credentials": { volOpts: controller.ProvisionOptions{ @@ -1145,6 +1163,7 @@ func TestProvision(t *testing.T) { }, getCredentialsErr: true, expectErr: true, + expectState: controller.ProvisioningNoChange, }, "fail vol with less capacity": { volOpts: controller.ProvisionOptions{ @@ -1156,6 +1175,7 @@ func TestProvision(t *testing.T) { }, volWithLessCap: true, expectErr: true, + expectState: controller.ProvisioningInBackground, }, "provision with mount options": { volOpts: controller.ProvisionOptions{ @@ -1214,6 +1234,65 @@ func TestProvision(t *testing.T) { t.Errorf("Expected 2 mount options") } }, + expectState: controller.ProvisioningFinished, + }, + "provision with final error": { + volOpts: controller.ProvisionOptions{ + StorageClass: &storagev1.StorageClass{ + Parameters: map[string]string{}, + ReclaimPolicy: &deletePolicy, + }, + PVName: "test-name", + PVC: &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + UID: "testid", + }, + Spec: v1.PersistentVolumeClaimSpec{ + Selector: nil, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): resource.MustParse(strconv.FormatInt(requestedBytes, 10)), + }, + }, + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + }, + }, + }, + createVolumeError: status.Error(codes.Unauthenticated, "Mock final error"), + expectCreateVolDo: func(ctx context.Context, req *csi.CreateVolumeRequest) { + // intentionally empty + }, + expectErr: true, + expectState: controller.ProvisioningFinished, + }, + "provision with transient error": { + volOpts: controller.ProvisionOptions{ + StorageClass: &storagev1.StorageClass{ + Parameters: map[string]string{}, + ReclaimPolicy: &deletePolicy, + }, + PVName: "test-name", + PVC: &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + UID: "testid", + }, + Spec: v1.PersistentVolumeClaimSpec{ + Selector: nil, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): resource.MustParse(strconv.FormatInt(requestedBytes, 10)), + }, + }, + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + }, + }, + }, + createVolumeError: status.Error(codes.DeadlineExceeded, "Mock timeout"), + expectCreateVolDo: func(ctx context.Context, req *csi.CreateVolumeRequest) { + // intentionally empty + }, + expectErr: true, + expectState: controller.ProvisioningInBackground, }, } @@ -1316,18 +1395,18 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested tc.volOpts.StorageClass.Parameters[provisionerSecretNamespaceKey] = "default" } else if tc.volWithLessCap { out.Volume.CapacityBytes = int64(80) - controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(out, nil).Times(1) - controllerServer.EXPECT().DeleteVolume(gomock.Any(), gomock.Any()).Return(&csi.DeleteVolumeResponse{}, nil).Times(1) + controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(out, tc.createVolumeError).Times(1) + controllerServer.EXPECT().DeleteVolume(gomock.Any(), gomock.Any()).Return(&csi.DeleteVolumeResponse{}, tc.createVolumeError).Times(1) } else if tc.expectCreateVolDo != nil { - controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Do(tc.expectCreateVolDo).Return(out, nil).Times(1) + controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Do(tc.expectCreateVolDo).Return(out, tc.createVolumeError).Times(1) } else { // Setup regular mock call expectations. if !tc.expectErr { - controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(out, nil).Times(1) + controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(out, tc.createVolumeError).Times(1) } } - pv, err := csiProvisioner.Provision(tc.volOpts) + pv, state, err := csiProvisioner.(controller.ProvisionerExt).ProvisionExt(tc.volOpts) if tc.expectErr && err == nil { t.Errorf("test %q: Expected error, got none", k) } @@ -1335,6 +1414,13 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested t.Errorf("test %q: got error: %v", k, err) } + if tc.expectState == "" { + tc.expectState = controller.ProvisioningFinished + } + if tc.expectState != state { + t.Errorf("test %q: expected ProvisioningState %s, got %s", k, tc.expectState, state) + } + if tc.expectedPVSpec != nil { if pv.Name != tc.expectedPVSpec.Name { t.Errorf("test %q: expected PV name: %q, got: %q", k, tc.expectedPVSpec.Name, pv.Name)