From ae0c72e682306f88f3dac41e02dacd35b6606a20 Mon Sep 17 00:00:00 2001 From: Deep Debroy Date: Thu, 14 Mar 2019 21:35:41 -0700 Subject: [PATCH] Add dynamic provisioning support for CSI Migration Signed-off-by: Deep Debroy --- cmd/csi-provisioner/csi-provisioner.go | 29 +++++++-- pkg/controller/controller.go | 88 ++++++++++++++++++-------- pkg/controller/controller_test.go | 10 +-- 3 files changed, 88 insertions(+), 39 deletions(-) diff --git a/cmd/csi-provisioner/csi-provisioner.go b/cmd/csi-provisioner/csi-provisioner.go index d1efbfa271..dfdbdb4fa4 100644 --- a/cmd/csi-provisioner/csi-provisioner.go +++ b/cmd/csi-provisioner/csi-provisioner.go @@ -39,6 +39,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/workqueue" + csitranslationlib "k8s.io/csi-translation-lib" utilfeature "k8s.io/apiserver/pkg/util/feature" utilflag "k8s.io/apiserver/pkg/util/flag" @@ -155,19 +156,35 @@ func init() { timeStamp := time.Now().UnixNano() / int64(time.Millisecond) identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName + provisionerOptions := []func(*controller.ProvisionController) error{ + controller.LeaderElection(*enableLeaderElection), + controller.FailedProvisionThreshold(0), + controller.FailedDeleteThreshold(0), + controller.RateLimiter(workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)), + controller.Threadiness(int(*workerThreads)), + } + + handlesMigrationFromInTreePlugin := false + handlesMigrationFromInTreePluginName := "" + if csitranslationlib.IsMigratedCSIDriverByName(provisionerName) { + handlesMigrationFromInTreePluginName, err = csitranslationlib.GetInTreeNameFromCSIName(provisionerName) + klog.V(2).Infof("Perform CSI migration for %s to %s", provisionerName, handlesMigrationFromInTreePluginName) + if err != nil { + klog.Fatalf("Failed to get InTree plugin name for migrated CSI plugin %s: %s", provisionerName, err) + } + provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{handlesMigrationFromInTreePluginName})) + handlesMigrationFromInTreePlugin = true + } + // Create the provisioner: it implements the Provisioner interface expected by // the controller - csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName) + csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, handlesMigrationFromInTreePlugin, handlesMigrationFromInTreePluginName, provisionerName) provisionController = controller.NewProvisionController( clientset, provisionerName, csiProvisioner, serverVersion.GitVersion, - controller.LeaderElection(*enableLeaderElection), - controller.FailedProvisionThreshold(0), - controller.FailedDeleteThreshold(0), - controller.RateLimiter(workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)), - controller.Threadiness(int(*workerThreads)), + provisionerOptions..., ) } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index f040e5d7ac..d5e381ec3e 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -42,6 +42,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned" + csitranslationlib "k8s.io/csi-translation-lib" "k8s.io/klog" "google.golang.org/grpc" @@ -143,17 +144,19 @@ var ( // CSIProvisioner struct type csiProvisioner struct { - client kubernetes.Interface - csiClient csi.ControllerClient - csiAPIClient csiclientset.Interface - grpcClient *grpc.ClientConn - snapshotClient snapclientset.Interface - timeout time.Duration - identity string - volumeNamePrefix string - volumeNameUUIDLength int - config *rest.Config - driverName string + client kubernetes.Interface + csiClient csi.ControllerClient + csiAPIClient csiclientset.Interface + grpcClient *grpc.ClientConn + snapshotClient snapclientset.Interface + timeout time.Duration + identity string + volumeNamePrefix string + volumeNameUUIDLength int + config *rest.Config + driverName string + handlesMigrationFromInTreePlugin bool + handlesMigrationFromInTreePluginName string } const ( @@ -238,20 +241,24 @@ func NewCSIProvisioner(client kubernetes.Interface, volumeNameUUIDLength int, grpcClient *grpc.ClientConn, snapshotClient snapclientset.Interface, + handlesMigrationFromInTreePlugin bool, + handlesMigrationFromInTreePluginName string, driverName string) controller.Provisioner { csiClient := csi.NewControllerClient(grpcClient) provisioner := &csiProvisioner{ - client: client, - grpcClient: grpcClient, - csiClient: csiClient, - csiAPIClient: csiAPIClient, - snapshotClient: snapshotClient, - timeout: connectionTimeout, - identity: identity, - volumeNamePrefix: volumeNamePrefix, - volumeNameUUIDLength: volumeNameUUIDLength, - driverName: driverName, + client: client, + grpcClient: grpcClient, + csiClient: csiClient, + csiAPIClient: csiAPIClient, + snapshotClient: snapshotClient, + timeout: connectionTimeout, + identity: identity, + volumeNamePrefix: volumeNamePrefix, + volumeNameUUIDLength: volumeNameUUIDLength, + handlesMigrationFromInTreePlugin: handlesMigrationFromInTreePlugin, + handlesMigrationFromInTreePluginName: handlesMigrationFromInTreePluginName, + driverName: driverName, } return provisioner } @@ -362,6 +369,24 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis return nil, fmt.Errorf("claim Selector is not supported") } + createVolumeRequestParameters := options.Parameters + performInTreeTranslation := false + if p.handlesMigrationFromInTreePlugin { + storageClassName := options.PVC.Spec.StorageClassName + storageClass, err := p.client.StorageV1().StorageClasses().Get(*storageClassName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get storage class named %s: %v", *storageClassName, err) + } + if storageClass.Provisioner == p.handlesMigrationFromInTreePluginName { + klog.V(2).Infof("Perform CSI migration for intree plugin %s", storageClass.Provisioner) + createVolumeRequestParameters, err = csitranslationlib.TranslateInTreeStorageClassParametersToCSI(p.handlesMigrationFromInTreePluginName, options.Parameters) + if err != nil { + return nil, fmt.Errorf("failed to translate storage class parameters: %v", err) + } + performInTreeTranslation = true + } + } + var needSnapshotSupport bool if options.PVC.Spec.DataSource != nil { // PVC.Spec.DataSource.Name is the name of the VolumeSnapshot API object @@ -388,7 +413,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis fsTypesFound := 0 fsType := "" - for k, v := range options.Parameters { + for k, v := range createVolumeRequestParameters { if strings.ToLower(k) == "fstype" || k == prefixedFsTypeKey { fsType = v fsTypesFound++ @@ -416,7 +441,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis // Create a CSI CreateVolumeRequest and Response req := csi.CreateVolumeRequest{ Name: pvName, - Parameters: options.Parameters, + Parameters: createVolumeRequestParameters, VolumeCapabilities: volumeCaps, CapacityRange: &csi.CapacityRange{ RequiredBytes: int64(volSizeBytes), @@ -452,7 +477,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis // Resolve provision secret credentials. // No PVC is provided when resolving provision/delete secret names, since the PVC may or may not exist at delete time. - provisionerSecretRef, err := getSecretReference(provisionerSecretParams, options.Parameters, pvName, nil) + provisionerSecretRef, err := getSecretReference(provisionerSecretParams, createVolumeRequestParameters, pvName, nil) if err != nil { return nil, err } @@ -463,20 +488,20 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis req.Secrets = provisionerCredentials // Resolve controller publish, node stage, node publish secret references - controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretParams, options.Parameters, pvName, options.PVC) + controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretParams, createVolumeRequestParameters, pvName, options.PVC) if err != nil { return nil, err } - nodeStageSecretRef, err := getSecretReference(nodeStageSecretParams, options.Parameters, pvName, options.PVC) + nodeStageSecretRef, err := getSecretReference(nodeStageSecretParams, createVolumeRequestParameters, pvName, options.PVC) if err != nil { return nil, err } - nodePublishSecretRef, err := getSecretReference(nodePublishSecretParams, options.Parameters, pvName, options.PVC) + nodePublishSecretRef, err := getSecretReference(nodePublishSecretParams, createVolumeRequestParameters, pvName, options.PVC) if err != nil { return nil, err } - req.Parameters, err = removePrefixedParameters(options.Parameters) + req.Parameters, err = removePrefixedParameters(createVolumeRequestParameters) if err != nil { return nil, fmt.Errorf("failed to strip CSI Parameters of prefixed keys: %v", err) } @@ -551,6 +576,13 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis pv.Spec.PersistentVolumeSource.CSI.FSType = fsType } + if performInTreeTranslation { + pv, err = csitranslationlib.TranslateCSIPVToInTree(pv) + if err != nil { + return nil, err + } + } + klog.Infof("successfully created PV %+v", pv.Spec.PersistentVolumeSource) return pv, nil diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index f83f677110..428cc04c87 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -555,7 +555,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) { defer mockController.Finish() defer driver.Stop() - csiProvisioner := NewCSIProvisioner(nil, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName) + csiProvisioner := NewCSIProvisioner(nil, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, false, "", driverName) // Requested PVC with requestedBytes storage opts := controller.VolumeOptions{ @@ -1402,7 +1402,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested clientSet = fakeclientset.NewSimpleClientset() } - csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName) + csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, false, "", driverName) out := &csi.CreateVolumeResponse{ Volume: &csi.Volume{ @@ -1759,7 +1759,7 @@ func TestProvisionFromSnapshot(t *testing.T) { return true, content, nil }) - csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName) + csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, false, "", driverName) out := &csi.CreateVolumeResponse{ Volume: &csi.Volume{ @@ -1857,7 +1857,7 @@ func TestProvisionWithTopology(t *testing.T) { clientSet := fakeclientset.NewSimpleClientset() csiClientSet := fakecsiclientset.NewSimpleClientset() - csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName) + csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, false, "", driverName) out := &csi.CreateVolumeResponse{ Volume: &csi.Volume{ @@ -1898,7 +1898,7 @@ func TestProvisionWithMountOptions(t *testing.T) { clientSet := fakeclientset.NewSimpleClientset() csiClientSet := fakecsiclientset.NewSimpleClientset() - csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName) + csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, false, "", driverName) out := &csi.CreateVolumeResponse{ Volume: &csi.Volume{