diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 43895ddfd..821259db8 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -51,9 +51,8 @@ type CSIAttachController struct { // Handler is responsible for handling VolumeAttachment events from informer. type Handler interface { // SyncNewOrUpdatedVolumeAttachment processes one Add/Updated event from - // VolumeAttachment informers. It runs in a workqueue and should be - // reasonably fast (i.e. talking to API server is OK, talking to CSI is - // not). + // VolumeAttachment informers. It runs in a workqueue, guaranting that only + // one SyncNewOrUpdatedVolumeAttachment runs for given VA. // SyncNewOrUpdatedVolumeAttachment is responsible for marking the // VolumeAttachment either as forgotten (resets exponential backoff) or // re-queue it into the provided queue to process it after exponential @@ -130,21 +129,19 @@ func (ctrl *CSIAttachController) processNextWorkItem() bool { defer ctrl.queue.Done(key) vaName := key.(string) - glog.V(4).Infof("work for VolumeAttachment %s started", vaName) + glog.V(4).Infof("Started processing %q", vaName) // get VolumeAttachment to process va, err := ctrl.vaLister.Get(vaName) if err != nil { if apierrs.IsNotFound(err) { // VolumeAttachment was deleted in the meantime, ignore. - // This will remove the VolumeAttachment from queue. - glog.V(4).Infof("%s deleted, ignoring", vaName) + glog.V(3).Infof("%q deleted, ignoring", vaName) return true } - if err != nil { - glog.Errorf("Error getting VolumeAttachment %s: %v", vaName, err) - ctrl.queue.AddRateLimited(vaName) - } + glog.Errorf("Error getting VolumeAttachment %q: %v", vaName, err) + ctrl.queue.AddRateLimited(vaName) + return true } if va.Spec.Attacher != ctrl.attacherName { glog.V(4).Infof("Skipping VolumeAttachment %s for attacher %s", va.Name, va.Spec.Attacher) diff --git a/pkg/controller/csi_handler.go b/pkg/controller/csi_handler.go new file mode 100644 index 000000000..c165f252c --- /dev/null +++ b/pkg/controller/csi_handler.go @@ -0,0 +1,254 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + + "github.com/golang/glog" + + storagev1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + corelister "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/util/workqueue" + + "github.com/kubernetes-csi/external-attacher-csi/pkg/connection" +) + +// csiHandler is a handler that calls CSI to attach/detach volume. +// It adds finalizer to VolumeAttachment instance to make sure they're detached +// before deletion. +type csiHandler struct { + client kubernetes.Interface + attacherName string + csiConnection connection.CSIConnection + pvLister corelister.PersistentVolumeLister + nodeLister corelister.NodeLister +} + +var _ Handler = &csiHandler{} + +func NewCSIHandler( + client kubernetes.Interface, + attacherName string, + csiConnection connection.CSIConnection, + pvLister corelister.PersistentVolumeLister, + nodeLister corelister.NodeLister) Handler { + + return &csiHandler{ + client, + attacherName, + csiConnection, + pvLister, + nodeLister, + } +} + +func (h *csiHandler) SyncNewOrUpdatedVolumeAttachment(va *storagev1.VolumeAttachment, queue workqueue.RateLimitingInterface) { + glog.V(4).Infof("CSIHandler: processing %q", va.Name) + + var err error + if va.DeletionTimestamp == nil { + err = h.syncAttach(va) + } else { + err = h.syncDetach(va) + } + if err != nil { + // Re-queue with exponential backoff + glog.V(2).Infof("Error processing %q: %s", va.Name, err) + queue.AddRateLimited(va.Name) + return + } + // The operation has finished successfully, reset exponential backoff + queue.Forget(va.Name) + glog.V(4).Infof("CSIHandler: finished processing %q", va.Name) +} + +func (h *csiHandler) syncAttach(va *storagev1.VolumeAttachment) error { + glog.V(4).Infof("Starting attach operation for %q", va.Name) + va, err := h.addVAFinalizer(va) + if err != nil { + return fmt.Errorf("could not add finalizer: %s", err) + } + + if va.Status.Attached { + // Volume is attached, there is nothing to be done. + glog.V(4).Infof("%q is already attached", va.Name) + return nil + } + + // Attach + glog.V(2).Infof("Attaching %q", va.Name) + metadata, err := h.csiAttach(va) + if err != nil { + var saveErr error + va, saveErr = h.saveAttachError(va, err) + if saveErr != nil { + // Just log it, propagate the attach error. + glog.V(2).Infof("Failed to save attach error to %q: %s", va.Name, saveErr.Error()) + } + // Add context to the error for logging + err := fmt.Errorf("failed to attach: %s", err) + return err + } + glog.V(2).Infof("Attached %q", va.Name) + + // Mark as attached + if _, err := markAsAttached(h.client, va, metadata); err != nil { + return fmt.Errorf("failed to mark as attached: %s", err) + } + glog.V(4).Infof("Fully attached %q", va.Name) + return nil +} + +func (h *csiHandler) syncDetach(va *storagev1.VolumeAttachment) error { + glog.V(4).Infof("Starting detach operation for %q", va.Name) + if !h.hasVAFinalizer(va) { + glog.V(4).Infof("%q is already detached", va.Name) + return nil + } + + glog.V(2).Infof("Detaching %q", va.Name) + if err := h.csiDetach(va); err != nil { + var saveErr error + va, saveErr = h.saveDetachError(va, err) + if saveErr != nil { + // Just log it, propagate the detach error. + glog.V(2).Infof("Failed to save detach error to %q: %s", va.Name, saveErr.Error()) + } + // Add context to the error for logging + err := fmt.Errorf("failed to detach: %s", err) + return err + } + glog.V(2).Infof("Detached %q", va.Name) + + if _, err := markAsDetached(h.client, va); err != nil { + return fmt.Errorf("could not mark as detached: %s", err) + } + glog.V(4).Infof("Fully detached %q", va.Name) + return nil +} + +func (h *csiHandler) addVAFinalizer(va *storagev1.VolumeAttachment) (*storagev1.VolumeAttachment, error) { + finalizerName := getFinalizerName(h.attacherName) + for _, f := range va.Finalizers { + if f == finalizerName { + // Finalizer is already present + glog.V(4).Infof("Finalizer is already set on %q", va.Name) + return va, nil + } + } + + // Finalizer is not present, add it + glog.V(4).Infof("Adding finalizer to %q", va.Name) + clone := va.DeepCopy() + clone.Finalizers = append(clone.Finalizers, finalizerName) + // TODO: use patch to save us from VersionError + newVA, err := h.client.StorageV1().VolumeAttachments().Update(clone) + if err != nil { + return va, err + } + glog.V(4).Infof("Finalizer added to %q", va.Name) + return newVA, nil +} + +func (h *csiHandler) hasVAFinalizer(va *storagev1.VolumeAttachment) bool { + finalizerName := getFinalizerName(h.attacherName) + for _, f := range va.Finalizers { + if f == finalizerName { + return true + } + } + return false +} + +func (h *csiHandler) csiAttach(va *storagev1.VolumeAttachment) (map[string]string, error) { + if va.Spec.PersistentVolumeName == nil { + return nil, fmt.Errorf("VolumeAttachment.spec.persistentVolumeName is empty") + } + + pv, err := h.pvLister.Get(*va.Spec.PersistentVolumeName) + if err != nil { + return nil, err + } + node, err := h.nodeLister.Get(va.Spec.NodeName) + if err != nil { + return nil, err + } + + ctx := context.TODO() + publishInfo, err := h.csiConnection.Attach(ctx, pv, node) + if err != nil { + return nil, err + } + + return publishInfo, nil +} + +func (h *csiHandler) csiDetach(va *storagev1.VolumeAttachment) error { + if va.Spec.PersistentVolumeName == nil { + return fmt.Errorf("VolumeAttachment.spec.persistentVolumeName is empty") + } + + pv, err := h.pvLister.Get(*va.Spec.PersistentVolumeName) + if err != nil { + return err + } + node, err := h.nodeLister.Get(va.Spec.NodeName) + if err != nil { + return err + } + + ctx := context.TODO() + if err := h.csiConnection.Detach(ctx, pv, node); err != nil { + return err + } + + return nil +} + +func (h *csiHandler) saveAttachError(va *storagev1.VolumeAttachment, err error) (*storagev1.VolumeAttachment, error) { + glog.V(4).Infof("Saving attach error to %q", va.Name) + clone := va.DeepCopy() + clone.Status.AttachError = &storagev1.VolumeError{ + Message: err.Error(), + Time: metav1.Now(), + } + newVa, err := h.client.StorageV1().VolumeAttachments().Update(clone) + if err != nil { + return va, err + } + glog.V(4).Infof("Saved attach error to %q", va.Name) + return newVa, nil +} + +func (h *csiHandler) saveDetachError(va *storagev1.VolumeAttachment, err error) (*storagev1.VolumeAttachment, error) { + glog.V(4).Infof("Saving detach error to %q", va.Name) + clone := va.DeepCopy() + clone.Status.DetachError = &storagev1.VolumeError{ + Message: err.Error(), + Time: metav1.Now(), + } + newVa, err := h.client.StorageV1().VolumeAttachments().Update(clone) + if err != nil { + return va, err + } + glog.V(4).Infof("Saved detach error to %q", va.Name) + return newVa, nil +} diff --git a/pkg/controller/csi_handler_test.go b/pkg/controller/csi_handler_test.go new file mode 100644 index 000000000..be56de04d --- /dev/null +++ b/pkg/controller/csi_handler_test.go @@ -0,0 +1,381 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "errors" + "fmt" + "testing" + + "github.com/kubernetes-csi/external-attacher-csi/pkg/connection" + + "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + core "k8s.io/client-go/testing" +) + +func csiHandlerFactory(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi connection.CSIConnection) Handler { + return NewCSIHandler(client, testAttacherName, csi, informerFactory.Core().V1().PersistentVolumes().Lister(), informerFactory.Core().V1().Nodes().Lister()) +} + +func pv() *v1.PersistentVolume { + return &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: testPVName, + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + CSI: &v1.CSIPersistentVolumeSource{ + Driver: testAttacherName, + VolumeHandle: testVolumeHandle, + ReadOnly: false, + }, + }, + }, + } +} + +func node() *v1.Node { + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNodeName, + Annotations: map[string]string{"nodeid.csi.volume.kubernetes.io/foo_bar": "MyNodeID"}, + }, + } +} + +func TestCSIHandler(t *testing.T) { + vaGroupResourceVersion := schema.GroupVersionResource{ + Group: storagev1.GroupName, + Version: "v1", + Resource: "volumeattachments", + } + + tests := []testCase{ + // + // ATTACH + // + { + name: "VolumeAttachment added -> successful attachment", + initialObjects: []runtime.Object{pv(), node()}, + addedVa: va(false /*attached*/, "" /*finalizer*/), + expectedActions: []core.Action{ + // Finalizer is saved first + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(false /*attached*/, "attacher-csi/test")), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, "attacher-csi/test")), + }, + expectedCSICalls: []csiCall{ + {"attach", testPVName, testNodeName, nil, nil}, + }, + }, + { + name: "VolumeAttachment updated -> successful attachment", + initialObjects: []runtime.Object{pv(), node()}, + updatedVa: va(false, ""), + expectedActions: []core.Action{ + // Finalizer is saved first + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(false /*attached*/, "attacher-csi/test")), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, "attacher-csi/test")), + }, + expectedCSICalls: []csiCall{ + {"attach", testPVName, testNodeName, nil, nil}, + }, + }, + { + name: "already attached volume -> ignored", + initialObjects: []runtime.Object{pv(), node()}, + updatedVa: va(true, "attacher-csi/test"), + expectedActions: []core.Action{}, + expectedCSICalls: []csiCall{}, + }, + { + name: "VolumeAttachment added -> successful attachment incl. metadata", + initialObjects: []runtime.Object{pv(), node()}, + addedVa: va(false, ""), + expectedActions: []core.Action{ + // Finalizer is saved first + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(false /*attached*/, "attacher-csi/test")), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, vaWithMetadata(va(true, "attacher-csi/test"), map[string]string{"foo": "bar"})), + }, + expectedCSICalls: []csiCall{ + {"attach", testPVName, testNodeName, nil, map[string]string{"foo": "bar"}}, + }, + }, + { + name: "unknown driver -> ignored", + initialObjects: []runtime.Object{pv(), node()}, + addedVa: vaWithInvalidDriver(va(false, "attacher-csi/test")), + expectedActions: []core.Action{}, + }, + { + name: "unknown PV -> error", + initialObjects: []runtime.Object{node()}, + addedVa: va(false, "attacher-csi/test"), + expectedActions: []core.Action{ + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, vaWithAttachError(va(false, "attacher-csi/test"), "persistentvolume \"pv1\" not found")), + }, + }, + { + name: "unknown PV -> error + error saving the error", + initialObjects: []runtime.Object{node()}, + addedVa: va(false, "attacher-csi/test"), + reactors: []reaction{ + { + verb: "update", + resource: "volumeattachments", + reactor: func(t *testing.T) core.ReactionFunc { + i := 0 + return func(core.Action) (bool, runtime.Object, error) { + i++ + if i < 3 { + // Update fails 2 times + return true, nil, apierrors.NewForbidden(storagev1.Resource("volumeattachments"), "pv1-node1", errors.New("Mock error")) + } + // Update succeeds for the 3rd time + return false, nil, nil + } + }, + }, + }, + expectedActions: []core.Action{ + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, vaWithAttachError(va(false, "attacher-csi/test"), "persistentvolume \"pv1\" not found")), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, vaWithAttachError(va(false, "attacher-csi/test"), "persistentvolume \"pv1\" not found")), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, vaWithAttachError(va(false, "attacher-csi/test"), "persistentvolume \"pv1\" not found")), + }, + }, + { + name: "invalid PV reference-> error", + initialObjects: []runtime.Object{pv(), node()}, + addedVa: vaWithNoPVReference(va(false, "attacher-csi/test")), + expectedActions: []core.Action{ + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, vaWithAttachError(vaWithNoPVReference(va(false, "attacher-csi/test")), "VolumeAttachment.spec.persistentVolumeName is empty")), + }, + }, + { + name: "unknown node -> error", + initialObjects: []runtime.Object{pv()}, + addedVa: va(false, "attacher-csi/test"), + expectedActions: []core.Action{ + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, vaWithAttachError(va(false, "attacher-csi/test"), "node \"node1\" not found")), + }, + }, + { + name: "failed write with finializers -> controller retries", + initialObjects: []runtime.Object{pv(), node()}, + addedVa: va(false, ""), + reactors: []reaction{ + { + verb: "update", + resource: "volumeattachments", + reactor: func(t *testing.T) core.ReactionFunc { + i := 0 + return func(core.Action) (bool, runtime.Object, error) { + i++ + if i < 3 { + // Update fails 2 times + return true, nil, apierrors.NewForbidden(storagev1.Resource("volumeattachments"), "pv1-node1", errors.New("Mock error")) + } + // Update succeeds for the 3rd time + return false, nil, nil + } + }, + }, + }, + expectedActions: []core.Action{ + // Save 2x fails + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(false /*attached*/, "attacher-csi/test")), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(false /*attached*/, "attacher-csi/test")), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(false /*attached*/, "attacher-csi/test")), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, "attacher-csi/test")), + }, + expectedCSICalls: []csiCall{ + {"attach", testPVName, testNodeName, nil, nil}, + }, + }, + { + name: "failed write with attached=true -> controller retries", + initialObjects: []runtime.Object{pv(), node()}, + addedVa: va(false, ""), + reactors: []reaction{ + { + verb: "update", + resource: "volumeattachments", + reactor: func(t *testing.T) core.ReactionFunc { + i := 0 + return func(core.Action) (bool, runtime.Object, error) { + i++ + if i != 2 { + return false, nil, nil + } + return true, nil, apierrors.NewForbidden(storagev1.Resource("volumeattachments"), "pv1-node1", errors.New("mock error")) + } + }, + }, + }, + expectedActions: []core.Action{ + // Finalizer is saved first + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(false /*attached*/, "attacher-csi/test")), + // Second save with attached=true fails + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, "attacher-csi/test")), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, "attacher-csi/test")), + }, + expectedCSICalls: []csiCall{ + {"attach", testPVName, testNodeName, nil, nil}, + {"attach", testPVName, testNodeName, nil, nil}, + }, + }, + { + name: "CSI attach fails -> controller retries", + initialObjects: []runtime.Object{pv(), node()}, + addedVa: va(false, ""), + expectedActions: []core.Action{ + // Finalizer is saved first + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(false /*attached*/, "attacher-csi/test")), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, vaWithAttachError(va(false, "attacher-csi/test"), "mock error")), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, "attacher-csi/test")), + }, + expectedCSICalls: []csiCall{ + {"attach", testPVName, testNodeName, fmt.Errorf("mock error"), nil}, + {"attach", testPVName, testNodeName, nil, nil}, + }, + }, + // + // DETACH + // + { + name: "VolumeAttachment marked for deletion -> successful detach", + initialObjects: []runtime.Object{pv(), node()}, + addedVa: deleted(va(true, "attacher-csi/test")), + expectedActions: []core.Action{ + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, ""))), + }, + expectedCSICalls: []csiCall{ + {"detach", testPVName, testNodeName, nil, nil}, + }, + }, + { + name: "CSI detach fails -> controller retries", + initialObjects: []runtime.Object{pv(), node()}, + addedVa: deleted(va(true, "attacher-csi/test")), + expectedActions: []core.Action{ + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, vaWithDetachError(deleted(va(true /*attached*/, "attacher-csi/test")), "mock error")), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, ""))), + }, + expectedCSICalls: []csiCall{ + {"detach", testPVName, testNodeName, fmt.Errorf("mock error"), nil}, + {"detach", testPVName, testNodeName, nil, nil}, + }, + }, + { + name: "already detached volume -> ignored", + initialObjects: []runtime.Object{pv(), node()}, + updatedVa: deleted(va(false, "")), + expectedActions: []core.Action{}, + expectedCSICalls: []csiCall{}, + }, + { + name: "detach unknown PV -> error", + initialObjects: []runtime.Object{node()}, + addedVa: deleted(va(true, "attacher-csi/test")), + expectedActions: []core.Action{ + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(vaWithDetachError(va(true, "attacher-csi/test"), "persistentvolume \"pv1\" not found"))), + }, + }, + { + name: "detach unknown PV -> error + error saving the error", + initialObjects: []runtime.Object{node()}, + addedVa: deleted(va(true, "attacher-csi/test")), + reactors: []reaction{ + { + verb: "update", + resource: "volumeattachments", + reactor: func(t *testing.T) core.ReactionFunc { + i := 0 + return func(core.Action) (bool, runtime.Object, error) { + i++ + if i < 3 { + // Update fails 2 times + return true, nil, apierrors.NewForbidden(storagev1.Resource("volumeattachments"), "pv1-node1", errors.New("Mock error")) + } + // Update succeeds for the 3rd time + return false, nil, nil + } + }, + }, + }, + expectedActions: []core.Action{ + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(vaWithDetachError(va(true, "attacher-csi/test"), "persistentvolume \"pv1\" not found"))), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(vaWithDetachError(va(true, "attacher-csi/test"), "persistentvolume \"pv1\" not found"))), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(vaWithDetachError(va(true, "attacher-csi/test"), "persistentvolume \"pv1\" not found"))), + }, + }, + { + name: "detach invalid PV reference-> error", + initialObjects: []runtime.Object{pv(), node()}, + addedVa: deleted(vaWithNoPVReference(va(true, "attacher-csi/test"))), + expectedActions: []core.Action{ + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(vaWithDetachError(vaWithNoPVReference(va(true, "attacher-csi/test")), "VolumeAttachment.spec.persistentVolumeName is empty"))), + }, + }, + { + name: "detach unknown node -> error", + initialObjects: []runtime.Object{pv()}, + addedVa: deleted(va(true, "attacher-csi/test")), + expectedActions: []core.Action{ + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(vaWithDetachError(va(true, "attacher-csi/test"), "node \"node1\" not found"))), + }, + }, + { + name: "failed write with attached=false -> controller retries", + initialObjects: []runtime.Object{pv(), node()}, + addedVa: deleted(va(false, "attacher-csi/test")), + reactors: []reaction{ + { + verb: "update", + resource: "volumeattachments", + reactor: func(t *testing.T) core.ReactionFunc { + i := 0 + return func(core.Action) (bool, runtime.Object, error) { + i++ + if i == 1 { + return true, nil, apierrors.NewForbidden(storagev1.Resource("volumeattachments"), "pv1-node1", errors.New("mock error")) + } + return false, nil, nil + } + }, + }, + }, + expectedActions: []core.Action{ + // Second save with attached=true fails + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false, ""))), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false, ""))), + }, + expectedCSICalls: []csiCall{ + {"detach", testPVName, testNodeName, nil, nil}, + {"detach", testPVName, testNodeName, nil, nil}, + }, + }, + } + + runTests(t, csiHandlerFactory, tests) +} diff --git a/pkg/controller/framework_test.go b/pkg/controller/framework_test.go index e11a81bcf..637088c4f 100644 --- a/pkg/controller/framework_test.go +++ b/pkg/controller/framework_test.go @@ -17,16 +17,23 @@ limitations under the License. package controller import ( + "context" + "fmt" "reflect" + "strings" "testing" "time" "github.com/davecgh/go-spew/spew" "github.com/golang/glog" + "github.com/kubernetes-csi/external-attacher-csi/pkg/connection" + "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" ) @@ -55,9 +62,29 @@ type testCase struct { updatedVa *storagev1.VolumeAttachment // List of expected kubeclient actions that should happen during the test. expectedActions []core.Action + // List of expected CSI calls + expectedCSICalls []csiCall + // Function to perform additional checks after the test finishes + additionalCheck func(t *testing.T, test testCase) } -func runTests(t *testing.T, tests []testCase) { +type csiCall struct { + // Name that's supposed to be called. "attach" or "detach". Other CSI calls + // are not supported for testing. + functionName string + // Expected name of the PV + pvName string + // Expected name of the node + nodeName string + // error to return + err error + // metadata to return (used only in Attach calls) + metadata map[string]string +} + +type handlerFactory func(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi connection.CSIConnection) Handler + +func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) { for _, test := range tests { glog.Infof("Test %q: started", test.name) objs := test.initialObjects @@ -67,26 +94,60 @@ func runTests(t *testing.T, tests []testCase) { if test.updatedVa != nil { objs = append(objs, test.updatedVa) } + + // Create client and informers client := fake.NewSimpleClientset(objs...) informers := informers.NewSharedInformerFactory(client, time.Hour /* disable resync*/) vaInformer := informers.Storage().V1().VolumeAttachments() - handler := NewTrivialHandler(client) - + pvInformer := informers.Core().V1().PersistentVolumes() + nodeInformer := informers.Core().V1().Nodes() + // Fill the informers with inital objects so controller can Get() them + for _, obj := range objs { + switch obj.(type) { + case *v1.PersistentVolume: + pvInformer.Informer().GetStore().Add(obj) + case *v1.Node: + nodeInformer.Informer().GetStore().Add(obj) + case *storagev1.VolumeAttachment: + vaInformer.Informer().GetStore().Add(obj) + default: + t.Fatalf("Unknown initalObject type: %+v", obj) + } + } + // This reactor makes sure that all updates that the controller does are + // reflected in its informers so Lister.Get() finds them. This does not + // enqueue events! + client.Fake.PrependReactor("update", "*", func(action core.Action) (bool, runtime.Object, error) { + if action.GetVerb() == "update" { + switch action.GetResource().Resource { + case "volumeattachments": + glog.V(5).Infof("Test reactor: updated VA") + vaInformer.Informer().GetStore().Update(action.(core.UpdateAction).GetObject()) + default: + t.Errorf("Unknown update resource: %s", action.GetResource()) + } + } + return false, nil, nil + }) + // Run any reactors that the test needs *before* the above one. for _, reactor := range test.reactors { client.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactor(t)) } + // Construct controller + csiConnection := &fakeCSIConnection{t: t, calls: test.expectedCSICalls} + handler := handlerFactory(client, informers, csiConnection) ctrl := NewCSIAttachController(client, testAttacherName, handler, vaInformer) + + // Start the test by enqueueing the right event if test.addedVa != nil { - vaInformer.Informer().GetStore().Add(test.addedVa) ctrl.vaAdded(test.addedVa) } if test.updatedVa != nil { - vaInformer.Informer().GetStore().Update(test.updatedVa) ctrl.vaUpdated(test.updatedVa, test.updatedVa) } - /* process the queue until we get expected results */ + // Process the queue until we get expected results timeout := time.Now().Add(10 * time.Second) lastReportedActionCount := 0 for { @@ -95,7 +156,7 @@ func runTests(t *testing.T, tests []testCase) { break } if ctrl.queue.Len() > 0 { - glog.V(4).Infof("Test %q: %d events in the queue, processing one", test.name, ctrl.queue.Len()) + glog.V(5).Infof("Test %q: %d events in the queue, processing one", test.name, ctrl.queue.Len()) ctrl.processNextWorkItem() } if ctrl.queue.Len() > 0 { @@ -105,7 +166,7 @@ func runTests(t *testing.T, tests []testCase) { currentActionCount := len(client.Actions()) if currentActionCount < len(test.expectedActions) { if lastReportedActionCount < currentActionCount { - glog.V(4).Infof("Test %q: got %d actions out of %d, waiting for the rest", test.name, currentActionCount, len(test.expectedActions)) + glog.V(5).Infof("Test %q: got %d actions out of %d, waiting for the rest", test.name, currentActionCount, len(test.expectedActions)) lastReportedActionCount = currentActionCount } // The test expected more to happen, wait for them @@ -118,13 +179,25 @@ func runTests(t *testing.T, tests []testCase) { actions := client.Actions() for i, action := range actions { if len(test.expectedActions) < i+1 { - t.Errorf("Test %q: %d unexpected actions: %+v", test.name, len(actions)-len(test.expectedActions), actions[i:]) + t.Errorf("Test %q: %d unexpected actions: %+v", test.name, len(actions)-len(test.expectedActions), spew.Sdump(actions[i:])) break } + // Sanitize time in attach/detach errors + if action.GetVerb() == "update" && action.GetResource().Resource == "volumeattachments" { + obj := action.(core.UpdateAction).GetObject() + o := obj.(*storagev1.VolumeAttachment) + if o.Status.AttachError != nil { + o.Status.AttachError.Time = metav1.Time{} + } + if o.Status.DetachError != nil { + o.Status.DetachError.Time = metav1.Time{} + } + } + expectedAction := test.expectedActions[i] if !reflect.DeepEqual(expectedAction, action) { - t.Errorf("Test %q:\nExpected:\n%s\ngot:\n%s", test.name, spew.Sdump(expectedAction), spew.Sdump(action)) + t.Errorf("Test %q: action %d\nExpected:\n%s\ngot:\n%s", test.name, i, spew.Sdump(expectedAction), spew.Sdump(action)) continue } } @@ -135,6 +208,157 @@ func runTests(t *testing.T, tests []testCase) { t.Logf(" %+v", a) } } + + if test.additionalCheck != nil { + test.additionalCheck(t, test) + } glog.Infof("Test %q: finished \n\n", test.name) } } + +// Helper function to create various objects +const ( + testAttacherName = "csi/test" + testPVName = "pv1" + testNodeName = "node1" + testVolumeHandle = "handle" +) + +func createVolumeAttachment(attacher string, pvName string, nodeName string, attached bool, finalizers string) *storagev1.VolumeAttachment { + va := &storagev1.VolumeAttachment{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvName + "-" + nodeName, + }, + Spec: storagev1.VolumeAttachmentSpec{ + Attacher: attacher, + NodeName: nodeName, + AttachedVolumeSource: storagev1.AttachedVolumeSource{ + PersistentVolumeName: &pvName, + }, + }, + Status: storagev1.VolumeAttachmentStatus{ + Attached: attached, + }, + } + if len(finalizers) > 0 { + va.Finalizers = strings.Split(finalizers, ",") + } + return va +} + +func va(attached bool, finalizers string) *storagev1.VolumeAttachment { + return createVolumeAttachment(testAttacherName, testPVName, testNodeName, attached, finalizers) +} + +func deleted(va *storagev1.VolumeAttachment) *storagev1.VolumeAttachment { + va.DeletionTimestamp = &metav1.Time{} + return va +} + +func vaWithMetadata(va *storagev1.VolumeAttachment, metadata map[string]string) *storagev1.VolumeAttachment { + va.Status.AttachmentMetadata = metadata + return va +} + +func vaWithNoPVReference(va *storagev1.VolumeAttachment) *storagev1.VolumeAttachment { + va.Spec.AttachedVolumeSource.PersistentVolumeName = nil + return va +} + +func vaWithInvalidDriver(va *storagev1.VolumeAttachment) *storagev1.VolumeAttachment { + return createVolumeAttachment("unknownDriver", testPVName, testNodeName, false, "") +} + +func vaWithAttachError(va *storagev1.VolumeAttachment, message string) *storagev1.VolumeAttachment { + va.Status.AttachError = &storagev1.VolumeError{ + Message: message, + Time: metav1.Time{}, + } + return va +} + +func vaWithDetachError(va *storagev1.VolumeAttachment, message string) *storagev1.VolumeAttachment { + va.Status.DetachError = &storagev1.VolumeError{ + Message: message, + Time: metav1.Time{}, + } + return va +} + +// Fake CSIConnection implementation that check that Attach/Detach is called +// with the right parameters and it returns proper error code and metadata. +type fakeCSIConnection struct { + calls []csiCall + index int + t *testing.T +} + +func (f *fakeCSIConnection) GetDriverName(ctx context.Context) (string, error) { + return "", fmt.Errorf("Not implemented") +} + +func (f *fakeCSIConnection) SupportsControllerPublish(ctx context.Context) (bool, error) { + return false, fmt.Errorf("Not implemented") +} + +func (f *fakeCSIConnection) Attach(ctx context.Context, pv *v1.PersistentVolume, node *v1.Node) (map[string]string, error) { + if f.index >= len(f.calls) { + f.t.Errorf("Unexpected CSI Attach call: pv=%s, node=%s, index: %d, calls: %+v", pv.Name, node.Name, f.index, f.calls) + return nil, fmt.Errorf("unexpected call") + } + call := f.calls[f.index] + f.index++ + + var err error + if call.functionName != "attach" { + f.t.Errorf("Unexpected CSI Attach call: pv=%s, node=%s, expected: %s", pv.Name, node.Name, call.functionName) + err = fmt.Errorf("unexpected attach call") + } + + if call.pvName != pv.Name { + f.t.Errorf("Wrong CSI Attach call: pv=%s, node=%s, expected PV: %s", pv.Name, node.Name, call.pvName) + err = fmt.Errorf("unexpected attach call") + } + + if call.nodeName != node.Name { + f.t.Errorf("Wrong CSI Attach call: pv=%s, node=%s, expected Node: %s", pv.Name, node.Name, call.nodeName) + err = fmt.Errorf("unexpected attach call") + } + if err != nil { + return nil, err + } + return call.metadata, call.err +} + +func (f *fakeCSIConnection) Detach(ctx context.Context, pv *v1.PersistentVolume, node *v1.Node) error { + if f.index >= len(f.calls) { + f.t.Errorf("Unexpected CSI Detach call: pv=%s, node=%s, index: %d, calls: %+v", pv.Name, node.Name, f.index, f.calls) + return fmt.Errorf("unexpected call") + } + call := f.calls[f.index] + f.index++ + + var err error + if call.functionName != "detach" { + f.t.Errorf("Unexpected CSI Detach call: pv=%s, node=%s, expected: %s", pv.Name, node.Name, call.functionName) + err = fmt.Errorf("unexpected detach call") + } + + if call.pvName != pv.Name { + f.t.Errorf("Wrong CSI Attach call: pv=%s, node=%s, expected PV: %s", pv.Name, node.Name, call.pvName) + err = fmt.Errorf("unexpected detach call") + } + + if call.nodeName != node.Name { + f.t.Errorf("Wrong CSI Attach call: pv=%s, node=%s, expected Node: %s", pv.Name, node.Name, call.nodeName) + err = fmt.Errorf("unexpected detach call") + } + if err != nil { + return err + } + return call.err +} + +func (f *fakeCSIConnection) Close() error { + return fmt.Errorf("Not implemented") +} diff --git a/pkg/controller/trivial_handler.go b/pkg/controller/trivial_handler.go index d0c47e2da..9d617ada6 100644 --- a/pkg/controller/trivial_handler.go +++ b/pkg/controller/trivial_handler.go @@ -40,7 +40,7 @@ func (h *trivialHandler) SyncNewOrUpdatedVolumeAttachment(va *storagev1.VolumeAt glog.V(4).Infof("Trivial sync[%s] started", va.Name) if !va.Status.Attached { // mark as attached - if err := h.markAsAttached(va); err != nil { + if _, err := markAsAttached(h.client, va, nil); err != nil { glog.Warningf("Error saving VolumeAttachment %s as attached: %s", va.Name, err) queue.AddRateLimited(va.Name) return @@ -49,10 +49,3 @@ func (h *trivialHandler) SyncNewOrUpdatedVolumeAttachment(va *storagev1.VolumeAt } queue.Forget(va.Name) } - -func (h *trivialHandler) markAsAttached(va *storagev1.VolumeAttachment) error { - clone := va.DeepCopy() - clone.Status.Attached = true - _, err := h.client.StorageV1().VolumeAttachments().Update(clone) - return err -} diff --git a/pkg/controller/trivial_handler_test.go b/pkg/controller/trivial_handler_test.go index e395dd066..2e8c960f2 100644 --- a/pkg/controller/trivial_handler_test.go +++ b/pkg/controller/trivial_handler_test.go @@ -20,44 +20,20 @@ import ( "errors" "testing" + "github.com/kubernetes-csi/external-attacher-csi/pkg/connection" + storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" core "k8s.io/client-go/testing" ) -const ( - testAttacherName = "csi/test" - testPVName = "pv1" - testNodeName = "node1" -) - -func createVolumeAttachment(attacher string, pvName string, nodeName string, attached bool) *storagev1.VolumeAttachment { - return &storagev1.VolumeAttachment{ - ObjectMeta: metav1.ObjectMeta{ - Name: pvName + "-" + nodeName, - }, - Spec: storagev1.VolumeAttachmentSpec{ - Attacher: attacher, - NodeName: nodeName, - AttachedVolumeSource: storagev1.AttachedVolumeSource{ - PersistentVolumeName: &pvName, - }, - }, - Status: storagev1.VolumeAttachmentStatus{ - Attached: attached, - }, - } -} - -func va(attached bool) *storagev1.VolumeAttachment { - return createVolumeAttachment(testAttacherName, testPVName, testNodeName, attached) -} - -func invalidDriverVA() *storagev1.VolumeAttachment { - return createVolumeAttachment("unknownDriver", testPVName, testNodeName, false) +func trivialHandlerFactory(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi connection.CSIConnection) Handler { + return NewTrivialHandler(client) } func TestTrivialHandler(t *testing.T) { @@ -70,26 +46,26 @@ func TestTrivialHandler(t *testing.T) { tests := []testCase{ { name: "add -> successful write", - addedVa: va(false), + addedVa: va(false, ""), expectedActions: []core.Action{ - core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true)), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true, "")), }, }, { name: "update -> successful write", - updatedVa: va(false), + updatedVa: va(false, ""), expectedActions: []core.Action{ - core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true)), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true, "")), }, }, { name: "unknown driver -> controller ignores", - addedVa: invalidDriverVA(), + addedVa: vaWithInvalidDriver(va(false, "")), expectedActions: []core.Action{}, }, { name: "failed write -> controller retries", - addedVa: va(false), + addedVa: va(false, ""), reactors: []reaction{ { verb: "update", @@ -109,12 +85,12 @@ func TestTrivialHandler(t *testing.T) { }, }, expectedActions: []core.Action{ - core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true)), - core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true)), - core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true)), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true, "")), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true, "")), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true, "")), }, }, } - runTests(t, tests) + runTests(t, trivialHandlerFactory, tests) } diff --git a/pkg/controller/util.go b/pkg/controller/util.go new file mode 100644 index 000000000..34eb4797b --- /dev/null +++ b/pkg/controller/util.go @@ -0,0 +1,81 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "github.com/golang/glog" + storagev1 "k8s.io/api/storage/v1" + "k8s.io/client-go/kubernetes" +) + +func markAsAttached(client kubernetes.Interface, va *storagev1.VolumeAttachment, metadata map[string]string) (*storagev1.VolumeAttachment, error) { + glog.V(4).Infof("Marking as attached %q", va.Name) + clone := va.DeepCopy() + clone.Status.Attached = true + clone.Status.AttachmentMetadata = metadata + clone.Status.AttachError = nil + // TODO: use patch to save us from VersionError + newVA, err := client.StorageV1().VolumeAttachments().Update(clone) + if err != nil { + return va, err + } + glog.V(4).Infof("Marked as attached %q", va.Name) + return newVA, nil +} + +func getFinalizerName(attacher string) string { + return "attacher-" + attacher +} + +func markAsDetached(client kubernetes.Interface, va *storagev1.VolumeAttachment) (*storagev1.VolumeAttachment, error) { + finalizerName := getFinalizerName(va.Spec.Attacher) + + // Prepare new array of finalizers + newFinalizers := make([]string, 0, len(va.Finalizers)) + found := false + for _, f := range va.Finalizers { + if f == finalizerName { + found = true + continue + } + newFinalizers = append(newFinalizers, f) + } + // Mostly to simplify unit tests, but it won't harm in production too + if len(newFinalizers) == 0 { + newFinalizers = nil + } + + if !found && !va.Status.Attached { + // Finalizer was not present, nothing to update + glog.V(4).Infof("Already fully detached %q", va.Name) + return va, nil + } + + glog.V(4).Infof("Marking as detached %q", va.Name) + clone := va.DeepCopy() + clone.Finalizers = newFinalizers + clone.Status.Attached = false + clone.Status.DetachError = nil + clone.Status.AttachmentMetadata = nil + // TODO: use patch to save us from VersionError + newVA, err := client.StorageV1().VolumeAttachments().Update(clone) + if err != nil { + return va, err + } + glog.V(4).Infof("Finalizer removed from %q", va.Name) + return newVA, nil +}