Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dynamic provisioning support for CSI migration scenarios #253

Merged
merged 2 commits into from
Mar 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 21 additions & 6 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (

utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflag "k8s.io/apiserver/pkg/util/flag"
csitranslationlib "k8s.io/csi-translation-lib"
)

var (
Expand Down Expand Up @@ -159,19 +160,33 @@ func init() {
timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName

provisionerOptions := []func(*controller.ProvisionController) error{
controller.LeaderElection(*enableLeaderElection),
controller.FailedProvisionThreshold(0),
controller.FailedDeleteThreshold(0),
controller.RateLimiter(workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)),
controller.Threadiness(int(*workerThreads)),
}

supportsMigrationFromInTreePluginName := ""
if csitranslationlib.IsMigratedCSIDriverByName(provisionerName) {
supportsMigrationFromInTreePluginName, err = csitranslationlib.GetInTreeNameFromCSIName(provisionerName)
if err != nil {
klog.Fatalf("Failed to get InTree plugin name for migrated CSI plugin %s: %v", provisionerName, err)
}
klog.V(2).Infof("Supports migration from in-tree plugin: %s", supportsMigrationFromInTreePluginName)
provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
}

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities)
csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities, supportsMigrationFromInTreePluginName)
provisionController = controller.NewProvisionController(
clientset,
provisionerName,
csiProvisioner,
serverVersion.GitVersion,
controller.LeaderElection(*enableLeaderElection),
controller.FailedProvisionThreshold(0),
controller.FailedDeleteThreshold(0),
controller.RateLimiter(workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)),
controller.Threadiness(int(*workerThreads)),
provisionerOptions...,
)
}

Expand Down
104 changes: 71 additions & 33 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
csitranslationlib "k8s.io/csi-translation-lib"
"k8s.io/klog"

"google.golang.org/grpc"
Expand Down Expand Up @@ -143,19 +144,20 @@ var (

// CSIProvisioner struct
type csiProvisioner struct {
client kubernetes.Interface
csiClient csi.ControllerClient
csiAPIClient csiclientset.Interface
grpcClient *grpc.ClientConn
snapshotClient snapclientset.Interface
timeout time.Duration
identity string
volumeNamePrefix string
volumeNameUUIDLength int
config *rest.Config
driverName string
pluginCapabilities connection.PluginCapabilitySet
controllerCapabilities connection.ControllerCapabilitySet
client kubernetes.Interface
csiClient csi.ControllerClient
csiAPIClient csiclientset.Interface
grpcClient *grpc.ClientConn
snapshotClient snapclientset.Interface
timeout time.Duration
identity string
volumeNamePrefix string
volumeNameUUIDLength int
config *rest.Config
driverName string
pluginCapabilities connection.PluginCapabilitySet
controllerCapabilities connection.ControllerCapabilitySet
supportsMigrationFromInTreePluginName string
}

var _ controller.Provisioner = &csiProvisioner{}
Expand Down Expand Up @@ -211,22 +213,24 @@ func NewCSIProvisioner(client kubernetes.Interface,
snapshotClient snapclientset.Interface,
driverName string,
pluginCapabilities connection.PluginCapabilitySet,
controllerCapabilities connection.ControllerCapabilitySet) controller.Provisioner {
controllerCapabilities connection.ControllerCapabilitySet,
supportsMigrationFromInTreePluginName string) controller.Provisioner {

csiClient := csi.NewControllerClient(grpcClient)
provisioner := &csiProvisioner{
client: client,
grpcClient: grpcClient,
csiClient: csiClient,
csiAPIClient: csiAPIClient,
snapshotClient: snapshotClient,
timeout: connectionTimeout,
identity: identity,
volumeNamePrefix: volumeNamePrefix,
volumeNameUUIDLength: volumeNameUUIDLength,
driverName: driverName,
pluginCapabilities: pluginCapabilities,
controllerCapabilities: controllerCapabilities,
client: client,
grpcClient: grpcClient,
csiClient: csiClient,
csiAPIClient: csiAPIClient,
snapshotClient: snapshotClient,
timeout: connectionTimeout,
identity: identity,
volumeNamePrefix: volumeNamePrefix,
volumeNameUUIDLength: volumeNameUUIDLength,
driverName: driverName,
pluginCapabilities: pluginCapabilities,
controllerCapabilities: controllerCapabilities,
supportsMigrationFromInTreePluginName: supportsMigrationFromInTreePluginName,
}
return provisioner
}
Expand Down Expand Up @@ -325,6 +329,31 @@ func getVolumeCapability(
}

func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.PersistentVolume, error) {
createVolumeRequestParameters := options.Parameters
migratedVolume := false
if p.supportsMigrationFromInTreePluginName != "" {
storageClassName := options.PVC.Spec.StorageClassName
// TODO(https://github.com/kubernetes-csi/external-provisioner/issues/256): use informers
// NOTE: we cannot depend on PVC.Annotations[volume.beta.kubernetes.io/storage-provisioner] to get
// the in-tree provisioner name in case of CSI migration scenarios. The annotation will be
// set to the CSI provisioner name by PV controller for migration scenarios
// so that external provisioner can correctly pick up the PVC pointing to an in-tree plugin
storageClass, err := p.client.StorageV1().StorageClasses().Get(*storageClassName, metav1.GetOptions{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a TODO(ISSUENUMBER): use informers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the only reason for Get class is to get the provisioner name, there is one in PVC.Annotations["volume.beta.kubernetes.io/storage-provisioner"]. Can this be used instead? Or please add comment why it can't.

Copy link
Collaborator Author

@ddebroy ddebroy Mar 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. PVC.Annotations["volume.beta.kubernetes.io/storage-provisioner"] cannot be used for migration scenarios because it's set to the name of the CSI provisioner at https://github.com/kubernetes/kubernetes/blob/596a48dd64bcaa01c1d2515dc79a558a4466d463/pkg/controller/volume/persistentvolume/pv_controller.go#L1383 and https://github.com/kubernetes/kubernetes/blob/596a48dd64bcaa01c1d2515dc79a558a4466d463/pkg/controller/volume/persistentvolume/pv_controller.go#L1396 so that external CSI provisioner can detect the PVC and act on it. Will clarify with comment.

if err != nil {
return nil, fmt.Errorf("failed to get storage class named %s: %v", *storageClassName, err)
}
if storageClass.Provisioner == p.supportsMigrationFromInTreePluginName {
klog.V(2).Infof("translating storage class parameters for in-tree plugin %s to CSI", storageClass.Provisioner)
createVolumeRequestParameters, err = csitranslationlib.TranslateInTreeStorageClassParametersToCSI(p.supportsMigrationFromInTreePluginName, options.Parameters)
if err != nil {
return nil, fmt.Errorf("failed to translate storage class parameters: %v", err)
}
migratedVolume = true
} else {
klog.V(4).Infof("skip translation of storage class parameters for plugin: %s", storageClass.Provisioner)
}
}

var needSnapshotSupport bool
if options.PVC.Spec.DataSource != nil {
// PVC.Spec.DataSource.Name is the name of the VolumeSnapshot API object
Expand Down Expand Up @@ -355,7 +384,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis

fsTypesFound := 0
fsType := ""
for k, v := range options.Parameters {
for k, v := range createVolumeRequestParameters {
if strings.ToLower(k) == "fstype" || k == prefixedFsTypeKey {
fsType = v
fsTypesFound++
Expand Down Expand Up @@ -383,7 +412,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
// Create a CSI CreateVolumeRequest and Response
req := csi.CreateVolumeRequest{
Name: pvName,
Parameters: options.Parameters,
Parameters: createVolumeRequestParameters,
VolumeCapabilities: volumeCaps,
CapacityRange: &csi.CapacityRange{
RequiredBytes: int64(volSizeBytes),
Expand Down Expand Up @@ -418,7 +447,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis

// Resolve provision secret credentials.
// No PVC is provided when resolving provision/delete secret names, since the PVC may or may not exist at delete time.
provisionerSecretRef, err := getSecretReference(provisionerSecretParams, options.Parameters, pvName, nil)
provisionerSecretRef, err := getSecretReference(provisionerSecretParams, createVolumeRequestParameters, pvName, nil)
if err != nil {
return nil, err
}
Expand All @@ -429,20 +458,20 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
req.Secrets = provisionerCredentials

// Resolve controller publish, node stage, node publish secret references
controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretParams, options.Parameters, pvName, options.PVC)
controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretParams, createVolumeRequestParameters, pvName, options.PVC)
if err != nil {
return nil, err
}
nodeStageSecretRef, err := getSecretReference(nodeStageSecretParams, options.Parameters, pvName, options.PVC)
nodeStageSecretRef, err := getSecretReference(nodeStageSecretParams, createVolumeRequestParameters, pvName, options.PVC)
if err != nil {
return nil, err
}
nodePublishSecretRef, err := getSecretReference(nodePublishSecretParams, options.Parameters, pvName, options.PVC)
nodePublishSecretRef, err := getSecretReference(nodePublishSecretParams, createVolumeRequestParameters, pvName, options.PVC)
if err != nil {
return nil, err
}

req.Parameters, err = removePrefixedParameters(options.Parameters)
req.Parameters, err = removePrefixedParameters(createVolumeRequestParameters)
if err != nil {
return nil, fmt.Errorf("failed to strip CSI Parameters of prefixed keys: %v", err)
}
Expand Down Expand Up @@ -516,6 +545,15 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
pv.Spec.PersistentVolumeSource.CSI.FSType = fsType
}

if migratedVolume {
pv, err = csitranslationlib.TranslateCSIPVToInTree(pv)
if err != nil {
klog.Warningf("failed to translate CSI PV to in-tree due to: %v. Deleting provisioned PV", err)
p.Delete(pv)
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This leaves the provisioned volume orphaned in the storage backend. We can either delete it (possibly resulting in other errors, oh well...) or return an error that suggests there is an orphan that must be cleaned manually. Both are quite bad, I'd prefer deleting the volume though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, some log (or even an event?) that this is a critical bug and should be reported / fixed would be helpful.

}
}

klog.Infof("successfully created PV %+v", pv.Spec.PersistentVolumeSource)

return pv, nil
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {
defer driver.Stop()

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(nil, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps)
csiProvisioner := NewCSIProvisioner(nil, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")

// Requested PVC with requestedBytes storage
opts := controller.VolumeOptions{
Expand Down Expand Up @@ -1164,7 +1164,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested
}

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps)
csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1514,7 +1514,7 @@ func TestProvisionFromSnapshot(t *testing.T) {
})

pluginCaps, controllerCaps := provisionFromSnapshotCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps)
csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "")

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1607,7 +1607,7 @@ func TestProvisionWithTopology(t *testing.T) {
clientSet := fakeclientset.NewSimpleClientset()
csiClientSet := fakecsiclientset.NewSimpleClientset()
pluginCaps, controllerCaps := provisionWithTopologyCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps)
csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1648,7 +1648,7 @@ func TestProvisionWithMountOptions(t *testing.T) {
clientSet := fakeclientset.NewSimpleClientset()
csiClientSet := fakecsiclientset.NewSimpleClientset()
pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps)
csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down
Loading