Skip to content

Commit

Permalink
Merge pull request kubernetes-csi#11 from jsafrane/alpha-api
Browse files Browse the repository at this point in the history
Update kubernetes to storage v1alpha1 API
  • Loading branch information
jsafrane authored Nov 15, 2017
2 parents d709591 + 5cd71c0 commit 79e88a3
Show file tree
Hide file tree
Showing 469 changed files with 24,916 additions and 12,657 deletions.
4 changes: 2 additions & 2 deletions cmd/csi-attacher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func main() {
} else {
pvLister := factory.Core().V1().PersistentVolumes().Lister()
nodeLister := factory.Core().V1().Nodes().Lister()
vaLister := factory.Storage().V1().VolumeAttachments().Lister()
vaLister := factory.Storage().V1alpha1().VolumeAttachments().Lister()
handler = controller.NewCSIHandler(clientset, attacher, csiConn, pvLister, nodeLister, vaLister)
glog.V(2).Infof("CSI driver supports ControllerPublishUnpublish, using real CSI handler")
}
Expand All @@ -118,7 +118,7 @@ func main() {
clientset,
attacher,
handler,
factory.Storage().V1().VolumeAttachments(),
factory.Storage().V1alpha1().VolumeAttachments(),
factory.Core().V1().PersistentVolumes(),
)

Expand Down
28 changes: 14 additions & 14 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ import (
"github.com/golang/glog"

"k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
storage "k8s.io/api/storage/v1alpha1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
coreinformerv1 "k8s.io/client-go/informers/core/v1"
storageinformerv1 "k8s.io/client-go/informers/storage/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1alpha1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corelisterv1 "k8s.io/client-go/listers/core/v1"
storagelisterv1 "k8s.io/client-go/listers/storage/v1"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1alpha1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
Expand All @@ -45,9 +45,9 @@ type CSIAttachController struct {
vaQueue workqueue.RateLimitingInterface
pvQueue workqueue.RateLimitingInterface

vaLister storagelisterv1.VolumeAttachmentLister
vaLister storagelisters.VolumeAttachmentLister
vaListerSynced cache.InformerSynced
pvLister corelisterv1.PersistentVolumeLister
pvLister corelisters.PersistentVolumeLister
pvListerSynced cache.InformerSynced
}

Expand All @@ -62,13 +62,13 @@ type Handler interface {
// VolumeAttachment either as forgotten (resets exponential backoff) or
// re-queue it into the vaQueue to process it after exponential
// backoff.
SyncNewOrUpdatedVolumeAttachment(va *storagev1.VolumeAttachment)
SyncNewOrUpdatedVolumeAttachment(va *storage.VolumeAttachment)

SyncNewOrUpdatedPersistentVolume(pv *v1.PersistentVolume)
}

// NewCSIAttachController returns a new *CSIAttachController
func NewCSIAttachController(client kubernetes.Interface, attacherName string, handler Handler, volumeAttachmentInformer storageinformerv1.VolumeAttachmentInformer, pvInformer coreinformerv1.PersistentVolumeInformer) *CSIAttachController {
func NewCSIAttachController(client kubernetes.Interface, attacherName string, handler Handler, volumeAttachmentInformer storageinformers.VolumeAttachmentInformer, pvInformer coreinformers.PersistentVolumeInformer) *CSIAttachController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.Core().Events(v1.NamespaceAll)})
var eventRecorder record.EventRecorder
Expand Down Expand Up @@ -124,22 +124,22 @@ func (ctrl *CSIAttachController) Run(workers int, stopCh <-chan struct{}) {

// vaAdded reacts to a VolumeAttachment creation
func (ctrl *CSIAttachController) vaAdded(obj interface{}) {
va := obj.(*storagev1.VolumeAttachment)
va := obj.(*storage.VolumeAttachment)
ctrl.vaQueue.Add(va.Name)
}

// vaUpdated reacts to a VolumeAttachment update
func (ctrl *CSIAttachController) vaUpdated(old, new interface{}) {
va := new.(*storagev1.VolumeAttachment)
va := new.(*storage.VolumeAttachment)
ctrl.vaQueue.Add(va.Name)
}

// vaDeleted reacts to a VolumeAttachment deleted
func (ctrl *CSIAttachController) vaDeleted(obj interface{}) {
va := obj.(*storagev1.VolumeAttachment)
if va != nil && va.Spec.AttachedVolumeSource.PersistentVolumeName != nil {
va := obj.(*storage.VolumeAttachment)
if va != nil && va.Spec.Source.PersistentVolumeName != nil {
// Enqueue PV sync event - it will evaluate and remove finalizer
ctrl.pvQueue.Add(*va.Spec.AttachedVolumeSource.PersistentVolumeName)
ctrl.pvQueue.Add(*va.Spec.Source.PersistentVolumeName)
}
}

Expand Down
56 changes: 28 additions & 28 deletions pkg/controller/csi_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (
"github.com/golang/glog"

"k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
storage "k8s.io/api/storage/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
corelister "k8s.io/client-go/listers/core/v1"
storagelister "k8s.io/client-go/listers/storage/v1"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1alpha1"
"k8s.io/client-go/util/workqueue"

"github.com/kubernetes-csi/external-attacher/pkg/connection"
Expand All @@ -41,9 +41,9 @@ type csiHandler struct {
client kubernetes.Interface
attacherName string
csiConnection connection.CSIConnection
pvLister corelister.PersistentVolumeLister
nodeLister corelister.NodeLister
vaLister storagelister.VolumeAttachmentLister
pvLister corelisters.PersistentVolumeLister
nodeLister corelisters.NodeLister
vaLister storagelisters.VolumeAttachmentLister
vaQueue, pvQueue workqueue.RateLimitingInterface
}

Expand All @@ -53,9 +53,9 @@ func NewCSIHandler(
client kubernetes.Interface,
attacherName string,
csiConnection connection.CSIConnection,
pvLister corelister.PersistentVolumeLister,
nodeLister corelister.NodeLister,
vaLister storagelister.VolumeAttachmentLister) Handler {
pvLister corelisters.PersistentVolumeLister,
nodeLister corelisters.NodeLister,
vaLister storagelisters.VolumeAttachmentLister) Handler {

return &csiHandler{
client: client,
Expand All @@ -72,7 +72,7 @@ func (h *csiHandler) Init(vaQueue workqueue.RateLimitingInterface, pvQueue workq
h.pvQueue = pvQueue
}

func (h *csiHandler) SyncNewOrUpdatedVolumeAttachment(va *storagev1.VolumeAttachment) {
func (h *csiHandler) SyncNewOrUpdatedVolumeAttachment(va *storage.VolumeAttachment) {
glog.V(4).Infof("CSIHandler: processing VA %q", va.Name)

var err error
Expand All @@ -92,7 +92,7 @@ func (h *csiHandler) SyncNewOrUpdatedVolumeAttachment(va *storagev1.VolumeAttach
glog.V(4).Infof("CSIHandler: finished processing %q", va.Name)
}

func (h *csiHandler) syncAttach(va *storagev1.VolumeAttachment) error {
func (h *csiHandler) syncAttach(va *storage.VolumeAttachment) error {
if va.Status.Attached {
// Volume is attached, there is nothing to be done.
glog.V(4).Infof("%q is already attached", va.Name)
Expand Down Expand Up @@ -123,7 +123,7 @@ func (h *csiHandler) syncAttach(va *storagev1.VolumeAttachment) error {
return nil
}

func (h *csiHandler) syncDetach(va *storagev1.VolumeAttachment) error {
func (h *csiHandler) syncDetach(va *storage.VolumeAttachment) error {
glog.V(4).Infof("Starting detach operation for %q", va.Name)
if !h.hasVAFinalizer(va) {
glog.V(4).Infof("%q is already detached", va.Name)
Expand All @@ -148,7 +148,7 @@ func (h *csiHandler) syncDetach(va *storagev1.VolumeAttachment) error {
return nil
}

func (h *csiHandler) addVAFinalizer(va *storagev1.VolumeAttachment) (*storagev1.VolumeAttachment, error) {
func (h *csiHandler) addVAFinalizer(va *storage.VolumeAttachment) (*storage.VolumeAttachment, error) {
finalizerName := connection.GetFinalizerName(h.attacherName)
for _, f := range va.Finalizers {
if f == finalizerName {
Expand All @@ -163,7 +163,7 @@ func (h *csiHandler) addVAFinalizer(va *storagev1.VolumeAttachment) (*storagev1.
clone := va.DeepCopy()
clone.Finalizers = append(clone.Finalizers, finalizerName)
// TODO: use patch to save us from VersionError
newVA, err := h.client.StorageV1().VolumeAttachments().Update(clone)
newVA, err := h.client.StorageV1alpha1().VolumeAttachments().Update(clone)
if err != nil {
return va, err
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func (h *csiHandler) addPVFinalizer(pv *v1.PersistentVolume) (*v1.PersistentVolu
return newPV, nil
}

func (h *csiHandler) hasVAFinalizer(va *storagev1.VolumeAttachment) bool {
func (h *csiHandler) hasVAFinalizer(va *storage.VolumeAttachment) bool {
finalizerName := connection.GetFinalizerName(h.attacherName)
for _, f := range va.Finalizers {
if f == finalizerName {
Expand All @@ -204,16 +204,16 @@ func (h *csiHandler) hasVAFinalizer(va *storagev1.VolumeAttachment) bool {
return false
}

func (h *csiHandler) csiAttach(va *storagev1.VolumeAttachment) (*storagev1.VolumeAttachment, map[string]string, error) {
func (h *csiHandler) csiAttach(va *storage.VolumeAttachment) (*storage.VolumeAttachment, map[string]string, error) {
glog.V(4).Infof("Starting attach operation for %q", va.Name)
// Check as much as possible before adding VA finalizer - it would block
// deletion of VA on error.

if va.Spec.PersistentVolumeName == nil {
if va.Spec.Source.PersistentVolumeName == nil {
return va, nil, fmt.Errorf("VolumeAttachment.spec.persistentVolumeName is empty")
}

pv, err := h.pvLister.Get(*va.Spec.PersistentVolumeName)
pv, err := h.pvLister.Get(*va.Spec.Source.PersistentVolumeName)
if err != nil {
return va, nil, err
}
Expand Down Expand Up @@ -260,12 +260,12 @@ func (h *csiHandler) csiAttach(va *storagev1.VolumeAttachment) (*storagev1.Volum
return va, publishInfo, nil
}

func (h *csiHandler) csiDetach(va *storagev1.VolumeAttachment) (*storagev1.VolumeAttachment, error) {
if va.Spec.PersistentVolumeName == nil {
func (h *csiHandler) csiDetach(va *storage.VolumeAttachment) (*storage.VolumeAttachment, error) {
if va.Spec.Source.PersistentVolumeName == nil {
return va, fmt.Errorf("VolumeAttachment.spec.persistentVolumeName is empty")
}

pv, err := h.pvLister.Get(*va.Spec.PersistentVolumeName)
pv, err := h.pvLister.Get(*va.Spec.Source.PersistentVolumeName)
if err != nil {
return va, err
}
Expand Down Expand Up @@ -303,29 +303,29 @@ func (h *csiHandler) csiDetach(va *storagev1.VolumeAttachment) (*storagev1.Volum
return va, nil
}

func (h *csiHandler) saveAttachError(va *storagev1.VolumeAttachment, err error) (*storagev1.VolumeAttachment, error) {
func (h *csiHandler) saveAttachError(va *storage.VolumeAttachment, err error) (*storage.VolumeAttachment, error) {
glog.V(4).Infof("Saving attach error to %q", va.Name)
clone := va.DeepCopy()
clone.Status.AttachError = &storagev1.VolumeError{
clone.Status.AttachError = &storage.VolumeError{
Message: err.Error(),
Time: metav1.Now(),
}
newVa, err := h.client.StorageV1().VolumeAttachments().Update(clone)
newVa, err := h.client.StorageV1alpha1().VolumeAttachments().Update(clone)
if err != nil {
return va, err
}
glog.V(4).Infof("Saved attach error to %q", va.Name)
return newVa, nil
}

func (h *csiHandler) saveDetachError(va *storagev1.VolumeAttachment, err error) (*storagev1.VolumeAttachment, error) {
func (h *csiHandler) saveDetachError(va *storage.VolumeAttachment, err error) (*storage.VolumeAttachment, error) {
glog.V(4).Infof("Saving detach error to %q", va.Name)
clone := va.DeepCopy()
clone.Status.DetachError = &storagev1.VolumeError{
clone.Status.DetachError = &storage.VolumeError{
Message: err.Error(),
Time: metav1.Now(),
}
newVa, err := h.client.StorageV1().VolumeAttachments().Update(clone)
newVa, err := h.client.StorageV1alpha1().VolumeAttachments().Update(clone)
if err != nil {
return va, err
}
Expand Down Expand Up @@ -368,7 +368,7 @@ func (h *csiHandler) SyncNewOrUpdatedPersistentVolume(pv *v1.PersistentVolume) {
return
}
for _, va := range vas {
if va.Spec.AttachedVolumeSource.PersistentVolumeName != nil && *va.Spec.AttachedVolumeSource.PersistentVolumeName == pv.Name {
if va.Spec.Source.PersistentVolumeName != nil && *va.Spec.Source.PersistentVolumeName == pv.Name {
// This PV is needed by this VA, don't remove finalizer
glog.V(4).Infof("CSIHandler: processing PV %q: VA %q found", pv.Name, va.Name)
h.pvQueue.Forget(pv.Name)
Expand Down
18 changes: 9 additions & 9 deletions pkg/controller/csi_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/kubernetes-csi/external-attacher/pkg/connection"

"k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
storage "k8s.io/api/storage/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -46,7 +46,7 @@ func csiHandlerFactory(client kubernetes.Interface, informerFactory informers.Sh
csi,
informerFactory.Core().V1().PersistentVolumes().Lister(),
informerFactory.Core().V1().Nodes().Lister(),
informerFactory.Storage().V1().VolumeAttachments().Lister())
informerFactory.Storage().V1alpha1().VolumeAttachments().Lister())
}

func pv() *v1.PersistentVolume {
Expand Down Expand Up @@ -98,8 +98,8 @@ func node() *v1.Node {

func TestCSIHandler(t *testing.T) {
vaGroupResourceVersion := schema.GroupVersionResource{
Group: storagev1.GroupName,
Version: "v1",
Group: storage.GroupName,
Version: "v1alpha1",
Resource: "volumeattachments",
}
pvGroupResourceVersion := schema.GroupVersionResource{
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestCSIHandler(t *testing.T) {
i++
if i < 3 {
// Update fails 2 times
return true, nil, apierrors.NewForbidden(storagev1.Resource("volumeattachments"), "pv1-node1", errors.New("Mock error"))
return true, nil, apierrors.NewForbidden(storage.Resource("volumeattachments"), "pv1-node1", errors.New("Mock error"))
}
// Update succeeds for the 3rd time
return false, nil, nil
Expand Down Expand Up @@ -297,7 +297,7 @@ func TestCSIHandler(t *testing.T) {
i++
if i < 3 {
// Update fails 2 times
return true, nil, apierrors.NewForbidden(storagev1.Resource("volumeattachments"), "pv1-node1", errors.New("Mock error"))
return true, nil, apierrors.NewForbidden(storage.Resource("volumeattachments"), "pv1-node1", errors.New("Mock error"))
}
// Update succeeds for the 3rd time
return false, nil, nil
Expand Down Expand Up @@ -333,7 +333,7 @@ func TestCSIHandler(t *testing.T) {
if i != 2 {
return false, nil, nil
}
return true, nil, apierrors.NewForbidden(storagev1.Resource("volumeattachments"), "pv1-node1", errors.New("mock error"))
return true, nil, apierrors.NewForbidden(storage.Resource("volumeattachments"), "pv1-node1", errors.New("mock error"))
}
},
},
Expand Down Expand Up @@ -432,7 +432,7 @@ func TestCSIHandler(t *testing.T) {
i++
if i < 3 {
// Update fails 2 times
return true, nil, apierrors.NewForbidden(storagev1.Resource("volumeattachments"), "pv1-node1", errors.New("Mock error"))
return true, nil, apierrors.NewForbidden(storage.Resource("volumeattachments"), "pv1-node1", errors.New("Mock error"))
}
// Update succeeds for the 3rd time
return false, nil, nil
Expand Down Expand Up @@ -475,7 +475,7 @@ func TestCSIHandler(t *testing.T) {
return func(core.Action) (bool, runtime.Object, error) {
i++
if i == 1 {
return true, nil, apierrors.NewForbidden(storagev1.Resource("volumeattachments"), "pv1-node1", errors.New("mock error"))
return true, nil, apierrors.NewForbidden(storage.Resource("volumeattachments"), "pv1-node1", errors.New("mock error"))
}
return false, nil, nil
}
Expand Down
Loading

0 comments on commit 79e88a3

Please sign in to comment.