Skip to content

Commit

Permalink
Add labeling for checking whether driver/node combination supports mi…
Browse files Browse the repository at this point in the history
…gration
  • Loading branch information
davidz627 committed Oct 26, 2018
1 parent 68b1d5e commit b582653
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (plugin *TestPlugin) CanSupport(spec *volume.Spec) bool {
return true
}

func (plugin *TestPlugin) IsMigratedToCSI() bool {
func (plugin *TestPlugin) IsMigratableToCSI() bool {
return false
}

Expand Down
1 change: 1 addition & 0 deletions pkg/volume/csi/nodeinfomanager/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/kubernetes-csi/kubernetes-csi-migration-library/plugins:go_default_library",
],
)

Expand Down
36 changes: 30 additions & 6 deletions pkg/volume/csi/nodeinfomanager/nodeinfomanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"

csiPlugins "github.com/kubernetes-csi/kubernetes-csi-migration-library/plugins"
)

const (
Expand All @@ -45,6 +47,12 @@ const (

var nodeKind = v1.SchemeGroupVersion.WithKind("Node")

var migratedDrivers = map[string](func() bool){
csiPlugins.GCEPDDriverName: func() bool {
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE)
},
}

// nodeInfoManager contains necessary common dependencies to update node info on both
// the Node and CSINodeInfo objects.
type nodeInfoManager struct {
Expand Down Expand Up @@ -370,6 +378,15 @@ func (nim *nodeInfoManager) createNodeInfoObject(
return err // do not wrap error
}

isMigratable := false
if driverIsMigratableFunc, ok := migratedDrivers[driverName]; ok {
isMigratable = driverIsMigratableFunc()
glog.V(4).Infof("CSI Driver %v found in migrated driver list, and migration status is %v", driverName, isMigratable)

} else {
glog.V(4).Infof("CSI Driver %v not found in migrated driver map", driverName)
}

nodeInfo := &csiv1alpha1.CSINodeInfo{
ObjectMeta: metav1.ObjectMeta{
Name: string(nim.nodeName),
Expand All @@ -384,9 +401,10 @@ func (nim *nodeInfoManager) createNodeInfoObject(
},
CSIDrivers: []csiv1alpha1.CSIDriverInfo{
{
Driver: driverName,
NodeID: driverNodeID,
TopologyKeys: topologyKeys,
Driver: driverName,
NodeID: driverNodeID,
TopologyKeys: topologyKeys,
IsDriverMigratableOnNode: isMigratable,
},
},
}
Expand Down Expand Up @@ -429,11 +447,17 @@ func (nim *nodeInfoManager) updateNodeInfoObject(
}
}

isMigratable := false
if driverIsMigratableFunc, ok := migratedDrivers[driverName]; ok {
isMigratable = driverIsMigratableFunc()
}

// Append new driver
driverInfo := csiv1alpha1.CSIDriverInfo{
Driver: driverName,
NodeID: driverNodeID,
TopologyKeys: topologyKeys.List(),
Driver: driverName,
NodeID: driverNodeID,
TopologyKeys: topologyKeys.List(),
IsDriverMigratableOnNode: isMigratable,
}
newDriverInfos = append(newDriverInfos, driverInfo)
nodeInfo.CSIDrivers = newDriverInfos
Expand Down
6 changes: 4 additions & 2 deletions pkg/volume/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,8 @@ func (pm *VolumePluginMgr) IsPluginMigratableBySpec(spec *Spec) (bool, error) {
}

if len(matches) == 0 {
return false, fmt.Errorf("no volume plugin matched")
// Not a known plugin (flex) in which case it is not migratable
return false, nil
}
if len(matches) > 1 {
return false, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matchedPluginNames, ","))
Expand All @@ -571,7 +572,8 @@ func (pm *VolumePluginMgr) IsPluginMigratableByName(name string) (bool, error) {
}
}
if len(matches) == 0 {
return false, fmt.Errorf("no volume plugin matched")
// Not a known plugin (flex) in which case it is not migratable
return false, nil
}
if len(matches) > 1 {
return false, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matchedPluginNames, ","))
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/util/operationexecutor/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/listers/csi/v1alpha1:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/kubernetes-csi/kubernetes-csi-migration-library:go_default_library",
],
Expand Down
155 changes: 136 additions & 19 deletions pkg/volume/util/operationexecutor/operation_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
csiapi "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
csiapiinformer "k8s.io/csi-api/pkg/client/informers/externalversions"
csiinformerlisters "k8s.io/csi-api/pkg/client/listers/csi/v1alpha1"
expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache"
"k8s.io/kubernetes/pkg/features"
kevents "k8s.io/kubernetes/pkg/kubelet/events"
Expand Down Expand Up @@ -65,6 +69,16 @@ type operationGenerator struct {

// blkUtil provides volume path related operations for block volume
blkUtil volumepathhandler.BlockVolumePathHandler

// csiNodeInfoLister is an informer for the CSINodeInfo CRs
// It currently can only be used on the attach/detach controller
// and only when the CSINodeInfo CRD is installed
csiNodeInfoLister csiinformerlisters.CSINodeInfoLister

// csiNodeInfoHasSynced returns true if the informer is running and is synced
// It currently can only be used on the attach/detach controller
// and only when the CSINodeInfo CRD is installed
csiNodeInfoHasSynced func() bool
}

// NewOperationGenerator is returns instance of operationGenerator
Expand All @@ -73,14 +87,29 @@ func NewOperationGenerator(kubeClient clientset.Interface,
recorder record.EventRecorder,
checkNodeCapabilitiesBeforeMount bool,
blkUtil volumepathhandler.BlockVolumePathHandler) OperationGenerator {

return &operationGenerator{
kubeClient: kubeClient,
volumePluginMgr: volumePluginMgr,
recorder: recorder,
og := &operationGenerator{
kubeClient: kubeClient,
volumePluginMgr: volumePluginMgr,
recorder: recorder,
checkNodeCapabilitiesBeforeMount: checkNodeCapabilitiesBeforeMount,
blkUtil: blkUtil,

blkUtil: blkUtil,
}

if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
csiKubeClient := volumePluginMgr.Host.GetCSIClient()
if csiKubeClient == nil {
glog.Warningf("The client for CSI Custom Resources is not available, skipping informer initialization")
} else {
factory := csiapiinformer.NewSharedInformerFactory(csiKubeClient, 1*time.Minute)
csiNodeInfos := factory.Csi().V1alpha1().CSINodeInfos()
og.csiNodeInfoHasSynced = csiNodeInfos.Informer().HasSynced
og.csiNodeInfoLister = csiNodeInfos.Lister()
go factory.Start(wait.NeverStop)
}
}

return og
}

// OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable
Expand Down Expand Up @@ -300,8 +329,13 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
}

originalSpec := volumeToAttach.VolumeSpec
// isMigrated will check both CSIMigration and the plugin specific feature gate
if isMigrated(og.volumePluginMgr, volumeToAttach.VolumeSpec) {

// isMigratable will check both CSIMigration and the plugin specific feature gate and Node Migration
im, err := isMigratable(og, volumeToAttach.VolumeSpec, volumeToAttach.VolumeName, volumeToAttach.NodeName)
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.IsMigratable failed", err)
}
if im {
// The volume represented by this spec is CSI and thus should be migrated
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
if err != nil || attachableVolumePlugin == nil {
Expand Down Expand Up @@ -392,17 +426,21 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(

if volumeToDetach.VolumeSpec != nil {
// Get attacher plugin
// isMigrated will check both CSIMigration and the plugin specific feature gate
if isMigrated(og.volumePluginMgr, volumeToDetach.VolumeSpec) {
// isMigratable will check both CSIMigration and the plugin specific feature gate
im, err := isMigratable(og, volumeToDetach.VolumeSpec, volumeToDetach.VolumeName, volumeToDetach.NodeName)
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.IsMigratable failed", err)
}
if im {
// The volume represented by this spec is CSI and thus should be migrated
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
if err != nil || attachableVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err)
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err)
}

csiSpec, err := translateSpec(volumeToDetach.VolumeSpec)
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.TranslateSpec failed", err)
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.TranslateSpec failed", err)
}

volumeToDetach.VolumeSpec = csiSpec
Expand Down Expand Up @@ -430,8 +468,12 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(

// TODO(dyzz): This case can't distinguish between PV and In-line which is necessary because
// if it was PV it may have been migrated, but the same plugin with in-line may not have been.
// Suggestions welcome...
if csiMigration.IsMigratedByName(pluginName) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
// fixing this depends on CSI in-line volumes implementation: PR #68232
inm, err := isNodeMigratable(og, pluginName, volumeToDetach.NodeName)
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.IsNodeMigratable failed", err)
}
if csiMigration.IsMigratedByName(pluginName) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && inm {
// The volume represented by this spec is CSI and thus should be migrated
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
if err != nil || attachableVolumePlugin == nil {
Expand Down Expand Up @@ -1504,14 +1546,89 @@ func isDeviceOpened(deviceToDetach AttachedVolume, mounter mount.Interface) (boo
return deviceOpened, nil
}

func isMigrated(vpm *volume.VolumePluginMgr, spec *volume.Spec) bool {
func isNodeMigratable(og *operationGenerator, pluginName string, nodeName types.NodeName) (bool, error) {
// TODO(dyzz): Write a test that runs with feature flags on, tries to do an inline volume with a migratable volume but without
// driver installed. Should get the correct error message. Then clean up. Try to do a migratable volume but WITH driver installed,
// should succeed (even better if we can check that the driver did something).
var err error
var nodeInfo *csiapi.CSINodeInfo

pm, err := og.volumePluginMgr.IsPluginMigratableByName(pluginName)
if err != nil {
return false, err
}

if !pm {
// Feature flags aren't on so migration as a whole is just turned off
return false, nil
}

if !utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
return false, fmt.Errorf("failed to check if node migrated, CSINodeInfo feature gate not enabled")
}

if og.csiNodeInfoHasSynced() {
nodeInfo, err = og.csiNodeInfoLister.Get(string(nodeName))
if err != nil {
return false, fmt.Errorf("failed to get CSI node info for node %v from informer: %v", string(nodeName), err)
}
} else {
glog.Warningf("CSINodeInfo informer not synced, please check that CSINodeInfo CRD is installed. If so, this warning should not appear again after ~1 minute")
// Fallback to a GET
nodeInfo, err = og.volumePluginMgr.Host.GetCSIClient().CsiV1alpha1().CSINodeInfos().Get(string(nodeName), metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("failed to get CSI node info for node %v: %v", string(nodeName), err)
}
}

driverName, err := csiMigration.GetCSINameFromIntreeName(pluginName)
if err != nil {
return false, err
}

for _, driver := range nodeInfo.CSIDrivers {
if driver.Driver == driverName {
return driver.IsDriverMigratableOnNode, nil
}
}
// The plugin is migrated but the driver is not installed
return false, fmt.Errorf("plugin %v is migratable but driver %v is not installed. Please install the driver and retry", pluginName, driverName)
}

func isMigratable(og *operationGenerator, spec *volume.Spec, uniqueVolumeName v1.UniqueVolumeName, nodeName types.NodeName) (bool, error) {
// IsPluginMigratableBySpec tests whether feature flags are on
pm, err := og.volumePluginMgr.IsPluginMigratableBySpec(spec)
if err != nil {
return false, err
}

if !pm {
// Feature flags aren't on so migration as a whole is just turned off
return false, nil
}

pluginName, _, err := util.SplitUniqueName(uniqueVolumeName)
if err != nil {
return false, fmt.Errorf("failed to split unique name %v: %v", uniqueVolumeName, err)
}
driverName, err := csiMigration.GetCSINameFromIntreeName(pluginName)
if err != nil {
return false, err
}

if csiMigration.IsPVMigrated(spec.PersistentVolume) || csiMigration.IsInlineMigrated(spec.Volume) {
migratable, err := vpm.IsPluginMigratableBySpec(spec)
if err == nil && migratable {
return true
inm, err := isNodeMigratable(og, pluginName, nodeName)
if err != nil {
return false, fmt.Errorf("failed to check if driver migrated on node: %v", err)
}
if inm {
return true, nil
}
}
return false
// If feature flags are on but we're not migratable for some other reason
// it is an error and a good error about installing the driver should be thrown
return false, fmt.Errorf("plugin %v is migratable but driver %v is not installed. Please install the driver and retry", pluginName, driverName)

}

func translateSpec(spec *volume.Spec) (*volume.Spec, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("csi.storage.k8s.io").Resources("csidrivers").RuleOrDie())
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("csi.storage.k8s.io").Resources("csinodeinfos").RuleOrDie())
}
}

return role
Expand Down
6 changes: 6 additions & 0 deletions staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ type CSIDriverInfo struct {
// determine which labels it should retrieve from the node object and pass
// back to the driver.
TopologyKeys []string `json:"topologyKeys"`

// IsDriverMigratableOnNode is a boolean representing whether the node supports
// migration to CSI for this driver. It is used by storage controllers on
// the control plane to determine whether to use a migrated version of the
// plugin for this driver on this node.
IsDriverMigratableOnNode bool `json:"isDriverMigratableOnNode"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down

0 comments on commit b582653

Please sign in to comment.