From 4e1dd517a0a2cddafe6d8ca1f58c2579bf29cfbd Mon Sep 17 00:00:00 2001 From: Sung Jun Park Date: Tue, 26 Apr 2022 21:08:18 +0000 Subject: [PATCH] chore: DeleteVolume should wait for replicas to be fully deleted instead of requeuing --- pkg/azureconstants/azure_constants.go | 1 + pkg/controller/azvolume.go | 118 ++++++++++++++---- pkg/controller/azvolume_test.go | 31 +++-- pkg/controller/common.go | 5 +- pkg/provisioner/crdprovisioner.go | 56 ++++----- pkg/provisioner/crdprovisioner_test.go | 5 +- .../conditionwaiter.go | 18 +-- .../conditionwatcher.go | 42 ++++--- 8 files changed, 182 insertions(+), 94 deletions(-) rename pkg/{provisioner => watcher}/conditionwaiter.go (87%) rename pkg/{provisioner => watcher}/conditionwatcher.go (80%) diff --git a/pkg/azureconstants/azure_constants.go b/pkg/azureconstants/azure_constants.go index 975a910d80..20ff132211 100644 --- a/pkg/azureconstants/azure_constants.go +++ b/pkg/azureconstants/azure_constants.go @@ -124,6 +124,7 @@ const ( CRIUpdateRetryDuration = time.Duration(1) * time.Second CRIUpdateRetryFactor = 3.0 CRIUpdateRetryStep = 5 + DefaultInformerResync = time.Duration(30) * time.Second ZonedField = "zoned" NormalUpdateMaxNetRetry = 0 ForcedUpdateMaxNetRetry = 5 diff --git a/pkg/controller/azvolume.go b/pkg/controller/azvolume.go index 230fd30ff0..ed9d1ac4e2 100644 --- a/pkg/controller/azvolume.go +++ b/pkg/controller/azvolume.go @@ -34,6 +34,7 @@ import ( diskv1beta1 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta1" "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils" util "sigs.k8s.io/azuredisk-csi-driver/pkg/util" + "sigs.k8s.io/azuredisk-csi-driver/pkg/watcher" consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -208,16 +209,6 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *diskv1b } }() - // Delete all AzVolumeAttachment objects bound to the deleted AzVolume - attachments, err := r.controllerSharedState.cleanUpAzVolumeAttachmentByVolume(ctx, azVolume.Name, azvolume, all, mode) - if err != nil { - return err - } - - if len(attachments) > 0 { - return status.Errorf(codes.Aborted, "volume deletion requeued until attached azVolumeAttachments are entirely detached...") - } - // only try deleting underlying volume 1) if volume creation was successful and 2) volumeDeleteRequestAnnotation is present // if the annotation is not present, only delete the CRI and not the underlying volume if isCreated(azVolume) && volumeDeleteRequested { @@ -239,31 +230,108 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *diskv1b } go func() { - cloudCtx, cloudCancel := context.WithTimeout(context.Background(), cloudTimeout) - defer cloudCancel() + deleteCtx, deleteCancel := context.WithTimeout(context.Background(), cloudTimeout) + defer deleteCancel() + + reportError := func(obj interface{}, err error) error { + azv := obj.(*diskv1beta1.AzVolume) + _ = r.updateError(azv, err) + _, derr := r.updateState(azv, diskv1beta1.VolumeDeletionFailed, forceUpdate) + return derr + } var updateFunc func(interface{}) error + var err error updateMode := azureutils.UpdateCRIStatus - err := r.deleteVolume(cloudCtx, azVolume) + + // Delete all AzVolumeAttachment objects bound to the deleted AzVolume + var attachments []diskv1beta1.AzVolumeAttachment + attachments, err = r.controllerSharedState.cleanUpAzVolumeAttachmentByVolume(deleteCtx, azVolume.Name, azvolume, all, mode) if err != nil { - klog.Errorf("failed to delete volume (%s): %v", azVolume.Spec.VolumeName, err) updateFunc = func(obj interface{}) error { - azv := obj.(*diskv1beta1.AzVolume) - azv = r.updateError(azv, err) - _, derr := r.updateState(azv, diskv1beta1.VolumeDeletionFailed, forceUpdate) - return derr + return reportError(obj, err) } } else { - klog.Infof("successfully deleted volume (%s)", azVolume.Spec.VolumeName) - updateFunc = func(obj interface{}) error { - azv := obj.(*diskv1beta1.AzVolume) - azv = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true}) - _, derr := r.updateState(azv, diskv1beta1.VolumeDeleted, forceUpdate) - return derr + var wg sync.WaitGroup + errors := make([]error, len(attachments)) + numErrors := uint32(0) + + verifyDeleted := func(obj interface{}, objectDeleted bool) (bool, error) { + if obj == nil || objectDeleted { + return true, nil + } + + // otherwise, the volume detachment has either failed with error or pending + azVolumeAttachmentInstance := obj.(*diskv1beta1.AzVolumeAttachment) + if azVolumeAttachmentInstance.Status.Error != nil { + return false, util.ErrorFromAzError(azVolumeAttachmentInstance.Status.Error) + } + return false, nil + } + + // start waiting for replica AzVolumeAttachment CRIs to be deleted + for i, attachment := range attachments { + waiter, err := r.controllerSharedState.conditionWatcher.NewConditionWaiter(deleteCtx, watcher.AzVolumeAttachmentType, attachment.Name, verifyDeleted) + if err != nil { + updateFunc = func(obj interface{}) error { + return reportError(obj, err) + } + break + } + + // wait async and report error to go channel + wg.Add(1) + go func(ctx context.Context, waiter *watcher.ConditionWaiter, i int) { + defer waiter.Close() + defer wg.Done() + _, err := waiter.Wait(ctx) + if err != nil { + errors[i] = err + atomic.AddUint32(&numErrors, 1) + } + }(deleteCtx, waiter, i) + } + + wg.Wait() + + // if errors have been found with the wait calls, format the error msg and report via CRI + if numErrors > 0 { + var errMsgs []string + for i, derr := range errors { + if derr != nil { + errMsgs = append(errMsgs, fmt.Sprintf("%s: %v", attachments[i].Name, derr)) + } + } + err = status.Errorf(codes.Internal, strings.Join(errMsgs, ", ")) + updateFunc = func(obj interface{}) error { + return reportError(obj, err) + } } - updateMode = azureutils.UpdateAll + } + + if err == nil { + cloudCtx, cloudCancel := context.WithTimeout(context.Background(), cloudTimeout) + defer cloudCancel() + + err := r.deleteVolume(cloudCtx, azVolume) + if err != nil { + klog.Errorf("failed to delete volume (%s): %v", azVolume.Spec.VolumeName, err) + updateFunc = func(obj interface{}) error { + return reportError(obj, err) + } + } else { + klog.Infof("successfully deleted volume (%s)", azVolume.Spec.VolumeName) + updateFunc = func(obj interface{}) error { + azv := obj.(*diskv1beta1.AzVolume) + azv = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true}) + _, derr := r.updateState(azv, diskv1beta1.VolumeDeleted, forceUpdate) + return derr + } + updateMode = azureutils.UpdateAll + } } + // UpdateCRIWithRetry should be called on a context w/o timeout when called in a separate goroutine as it is not going to be retriggered and leave the CRI in unrecoverable transient state instead. updateCtx := context.Background() _ = azureutils.UpdateCRIWithRetry(updateCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode) diff --git a/pkg/controller/azvolume_test.go b/pkg/controller/azvolume_test.go index ccd2a60982..e469474b27 100644 --- a/pkg/controller/azvolume_test.go +++ b/pkg/controller/azvolume_test.go @@ -20,12 +20,14 @@ import ( "context" "sync" "testing" - "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" fakev1 "k8s.io/client-go/kubernetes/fake" diskv1beta1 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta1" @@ -34,6 +36,7 @@ import ( "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils" "sigs.k8s.io/azuredisk-csi-driver/pkg/controller/mockclient" "sigs.k8s.io/azuredisk-csi-driver/pkg/controller/mockvolumeprovisioner" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -216,7 +219,7 @@ func TestAzVolumeControllerReconcile(t *testing.T) { }, }, { - description: "[Failure] Should delete volume attachments and requeue when AzVolume is marked for deletion.", + description: "[Success] Should delete replica volume attachments and delete AzVolume respectively", request: testAzVolume0Request, setupFunc: func(t *testing.T, mockCtl *gomock.Controller) *ReconcileAzVolume { azVolume := testAzVolume0.DeepCopy() @@ -237,7 +240,6 @@ func TestAzVolumeControllerReconcile(t *testing.T) { mockCtl, testNamespace, azVolume, - &testPrimaryAzVolumeAttachment0, &testReplicaAzVolumeAttachment) mockClientsAndVolumeProvisioner(controller) @@ -246,14 +248,23 @@ func TestAzVolumeControllerReconcile(t *testing.T) { }, verifyFunc: func(t *testing.T, controller *ReconcileAzVolume, result reconcile.Result, err error) { require.NoError(t, err) - require.Greater(t, result.RequeueAfter, time.Duration(0)) - - azVolume, err := controller.controllerSharedState.azClient.DiskV1beta1().AzVolumes(testNamespace).Get(context.TODO(), testPersistentVolume0Name, metav1.GetOptions{}) + req, err := azureutils.CreateLabelRequirements(consts.VolumeNameLabel, selection.Equals, testPersistentVolume0Name) + require.NoError(t, err) + labelSelector := labels.NewSelector().Add(*req) + checkAzVolumeAttachmentDeletion := func() (bool, error) { + var attachments diskv1beta1.AzVolumeAttachmentList + err := controller.controllerSharedState.cachedClient.List(context.Background(), &attachments, &client.ListOptions{LabelSelector: labelSelector}) + return len(attachments.Items) == 0, err + } + err = wait.PollImmediate(verifyCRIInterval, verifyCRITimeout, checkAzVolumeAttachmentDeletion) + require.NoError(t, err) + checkAzVolumeDeletion := func() (bool, error) { + var azVolume diskv1beta1.AzVolume + err := controller.controllerSharedState.cachedClient.Get(context.Background(), types.NamespacedName{Namespace: controller.controllerSharedState.objectNamespace, Name: testPersistentVolume0Name}, &azVolume) + return azVolume.Status.State == diskv1beta1.VolumeDeleted, err + } + err = wait.PollImmediate(verifyCRIInterval, verifyCRITimeout, checkAzVolumeDeletion) require.NoError(t, err) - require.Equal(t, diskv1beta1.VolumeCreated, azVolume.Status.State) - - azVolumeAttachments, _ := controller.controllerSharedState.azClient.DiskV1beta1().AzVolumeAttachments(testNamespace).List(context.TODO(), metav1.ListOptions{}) - require.Len(t, azVolumeAttachments.Items, 0) }, }, } diff --git a/pkg/controller/common.go b/pkg/controller/common.go index ddd43645e0..9512b26930 100644 --- a/pkg/controller/common.go +++ b/pkg/controller/common.go @@ -44,8 +44,10 @@ import ( "k8s.io/klog/v2" diskv1beta1 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta1" azClientSet "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/client/clientset/versioned" + azurediskInformers "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/client/informers/externalversions" consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants" "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils" + "sigs.k8s.io/azuredisk-csi-driver/pkg/watcher" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" @@ -266,10 +268,11 @@ type SharedState struct { cachedClient client.Client azClient azClientSet.Interface kubeClient kubernetes.Interface + conditionWatcher *watcher.ConditionWatcher } func NewSharedState(driverName, objectNamespace, topologyKey string, eventRecorder record.EventRecorder, cachedClient client.Client, azClient azClientSet.Interface, kubeClient kubernetes.Interface) *SharedState { - newSharedState := &SharedState{driverName: driverName, objectNamespace: objectNamespace, topologyKey: topologyKey, eventRecorder: eventRecorder, cachedClient: cachedClient, azClient: azClient, kubeClient: kubeClient} + newSharedState := &SharedState{driverName: driverName, objectNamespace: objectNamespace, topologyKey: topologyKey, eventRecorder: eventRecorder, cachedClient: cachedClient, azClient: azClient, kubeClient: kubeClient, conditionWatcher: watcher.NewConditionWatcher(context.Background(), azClient, azurediskInformers.NewSharedInformerFactory(azClient, consts.DefaultInformerResync), objectNamespace)} newSharedState.createReplicaRequestsQueue() return newSharedState } diff --git a/pkg/provisioner/crdprovisioner.go b/pkg/provisioner/crdprovisioner.go index 2659631d0b..0fc6ddfecb 100644 --- a/pkg/provisioner/crdprovisioner.go +++ b/pkg/provisioner/crdprovisioner.go @@ -42,18 +42,18 @@ import ( consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants" "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils" "sigs.k8s.io/azuredisk-csi-driver/pkg/util" + "sigs.k8s.io/azuredisk-csi-driver/pkg/watcher" ) type CrdProvisioner struct { azDiskClient azDiskClientSet.Interface namespace string - conditionWatcher *conditionWatcher + conditionWatcher *watcher.ConditionWatcher } const ( // TODO: Figure out good interval and timeout values, and make them configurable. - interval = time.Duration(1) * time.Second - informerResync = time.Duration(30) * time.Second + interval = time.Duration(1) * time.Second ) func NewCrdProvisioner(kubeConfig *rest.Config, objNamespace string) (*CrdProvisioner, error) { @@ -62,12 +62,12 @@ func NewCrdProvisioner(kubeConfig *rest.Config, objNamespace string) (*CrdProvis return nil, err } - informerFactory := azurediskInformers.NewSharedInformerFactory(diskClient, informerResync) + informerFactory := azurediskInformers.NewSharedInformerFactory(diskClient, consts.DefaultInformerResync) return &CrdProvisioner{ azDiskClient: diskClient, namespace: objNamespace, - conditionWatcher: newConditionWatcher(context.Background(), diskClient, informerFactory, objNamespace), + conditionWatcher: watcher.NewConditionWatcher(context.Background(), diskClient, informerFactory, objNamespace), }, nil } @@ -144,7 +144,7 @@ func (c *CrdProvisioner) CreateVolume( volumeContentSource *diskv1beta1.ContentVolumeSource, accessibilityReq *diskv1beta1.TopologyRequirement) (*diskv1beta1.AzVolumeStatusDetail, error) { - lister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace) + lister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace) azVolumeClient := c.azDiskClient.DiskV1beta1().AzVolumes(c.namespace) _, maxMountReplicaCount := azureutils.GetMaxSharesAndMaxMountReplicaCount(parameters, azureutils.HasMultiNodeAzVolumeCapabilityAccessMode(volumeCapabilities)) @@ -154,7 +154,7 @@ func (c *CrdProvisioner) CreateVolume( volumeName = azureutils.CreateValidDiskName(volumeName, true) azVolumeName := strings.ToLower(volumeName) - waiter, err := c.conditionWatcher.newConditionWaiter(ctx, azVolumeType, azVolumeName, func(obj interface{}, _ bool) (bool, error) { + waiter, err := c.conditionWatcher.NewConditionWaiter(ctx, watcher.AzVolumeType, azVolumeName, func(obj interface{}, _ bool) (bool, error) { if obj == nil { return false, nil } @@ -207,7 +207,7 @@ func (c *CrdProvisioner) CreateVolume( return nil } - if err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.informerFactory, nil, c.azDiskClient, azVolumeInstance, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateAll); err != nil { + if err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.InformerFactory(), nil, c.azDiskClient, azVolumeInstance, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateAll); err != nil { return nil, err } // if the error was caused by errors other than IsNotFound, return failure @@ -288,7 +288,7 @@ func (c *CrdProvisioner) DeleteVolume(ctx context.Context, volumeID string, secr // return the AzVolume name to the caller as the volume ID. To make this work, we would need to implement // snapshot APIs through the CRD provisioner. // Replace them in all instances in this file. - lister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace) + lister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace) azVolumeClient := c.azDiskClient.DiskV1beta1().AzVolumes(c.namespace) volumeName, err := azureutils.GetDiskName(volumeID) @@ -298,7 +298,7 @@ func (c *CrdProvisioner) DeleteVolume(ctx context.Context, volumeID string, secr } azVolumeName := strings.ToLower(volumeName) - waiter, err := c.conditionWatcher.newConditionWaiter(ctx, azVolumeType, azVolumeName, func(obj interface{}, objectDeleted bool) (bool, error) { + waiter, err := c.conditionWatcher.NewConditionWaiter(ctx, watcher.AzVolumeType, azVolumeName, func(obj interface{}, objectDeleted bool) (bool, error) { // if no object is found, object is deleted if obj == nil || objectDeleted { return true, nil @@ -352,7 +352,7 @@ func (c *CrdProvisioner) DeleteVolume(ctx context.Context, volumeID string, secr } // update AzVolume CRI with annotation and reset state with retry upon conflict - if err := azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.informerFactory, nil, c.azDiskClient, azVolumeInstance, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil { + if err := azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.InformerFactory(), nil, c.azDiskClient, azVolumeInstance, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil { return err } @@ -382,9 +382,9 @@ func (c *CrdProvisioner) PublishVolume( volumeContext map[string]string, ) (map[string]string, error) { - azVALister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace) - nodeLister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzDriverNodes().Lister().AzDriverNodes(c.namespace) - volumeLister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace) + azVALister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace) + nodeLister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzDriverNodes().Lister().AzDriverNodes(c.namespace) + volumeLister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace) azVAClient := c.azDiskClient.DiskV1beta1().AzVolumeAttachments(c.namespace) volumeName, err := azureutils.GetDiskName(volumeID) @@ -523,13 +523,13 @@ func (c *CrdProvisioner) PublishVolume( } updateMode = azureutils.UpdateAll } - err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.informerFactory, nil, c.azDiskClient, attachmentObj, updateFunc, consts.NormalUpdateMaxNetRetry, updateMode) + err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.InformerFactory(), nil, c.azDiskClient, attachmentObj, updateFunc, consts.NormalUpdateMaxNetRetry, updateMode) return publishContext, err } func (c *CrdProvisioner) WaitForAttach(ctx context.Context, volumeID, nodeID string) (*diskv1beta1.AzVolumeAttachment, error) { - lister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace) + lister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace) volumeName, err := azureutils.GetDiskName(volumeID) if err != nil { @@ -538,7 +538,7 @@ func (c *CrdProvisioner) WaitForAttach(ctx context.Context, volumeID, nodeID str volumeName = strings.ToLower(volumeName) attachmentName := azureutils.GetAzVolumeAttachmentName(volumeName, nodeID) - waiter, err := c.conditionWatcher.newConditionWaiter(ctx, azVolumeAttachmentType, attachmentName, func(obj interface{}, _ bool) (bool, error) { + waiter, err := c.conditionWatcher.NewConditionWaiter(ctx, watcher.AzVolumeAttachmentType, attachmentName, func(obj interface{}, _ bool) (bool, error) { if obj == nil { return false, nil } @@ -575,7 +575,7 @@ func (c *CrdProvisioner) WaitForAttach(ctx context.Context, volumeID, nodeID str updateInstance.Status.Error = nil return nil } - if err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.informerFactory, nil, c.azDiskClient, azVolumeAttachmentInstance, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil { + if err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.InformerFactory(), nil, c.azDiskClient, azVolumeAttachmentInstance, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil { return nil, err } } @@ -602,7 +602,7 @@ func (c *CrdProvisioner) UnpublishVolume( mode consts.UnpublishMode) error { azVAClient := c.azDiskClient.DiskV1beta1().AzVolumeAttachments(c.namespace) - azVALister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace) + azVALister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace) volumeName, err := azureutils.GetDiskName(volumeID) if err != nil { @@ -634,7 +634,7 @@ func (c *CrdProvisioner) shouldDemote(volumeName string, mode consts.UnpublishMo if mode == consts.Detach { return false, nil } - azVolumeInstance, err := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace).Get(volumeName) + azVolumeInstance, err := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace).Get(volumeName) if err != nil { return false, err } @@ -658,7 +658,7 @@ func (c *CrdProvisioner) demoteVolume(ctx context.Context, azVAClient v1beta1.Az updateInstance.Labels[consts.RoleChangeLabel] = consts.Demoted return nil } - return azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.informerFactory, nil, c.azDiskClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateAll) + return azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.InformerFactory(), nil, c.azDiskClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateAll) } func (c *CrdProvisioner) detachVolume(ctx context.Context, azVAClient v1beta1.AzVolumeAttachmentInterface, azVolumeAttachment *diskv1beta1.AzVolumeAttachment) error { @@ -703,7 +703,7 @@ func (c *CrdProvisioner) detachVolume(ctx context.Context, azVAClient v1beta1.Az } klog.Infof("Requesting AzVolumeAttachment (%s) deletion", attachmentName) - if err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.informerFactory, nil, c.azDiskClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil { + if err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.InformerFactory(), nil, c.azDiskClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil { return err } @@ -723,7 +723,7 @@ func (c *CrdProvisioner) detachVolume(ctx context.Context, azVAClient v1beta1.Az } func (c *CrdProvisioner) WaitForDetach(ctx context.Context, volumeID, nodeID string) error { - lister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace) + lister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace) volumeName, err := azureutils.GetDiskName(volumeID) if err != nil { @@ -733,7 +733,7 @@ func (c *CrdProvisioner) WaitForDetach(ctx context.Context, volumeID, nodeID str attachmentName := azureutils.GetAzVolumeAttachmentName(volumeName, nodeID) - waiter, err := c.conditionWatcher.newConditionWaiter(ctx, azVolumeAttachmentType, attachmentName, func(obj interface{}, objectDeleted bool) (bool, error) { + waiter, err := c.conditionWatcher.NewConditionWaiter(ctx, watcher.AzVolumeAttachmentType, attachmentName, func(obj interface{}, objectDeleted bool) (bool, error) { // if no object is found, return if obj == nil || objectDeleted { return true, nil @@ -767,7 +767,7 @@ func (c *CrdProvisioner) ExpandVolume( capacityRange *diskv1beta1.CapacityRange, secrets map[string]string) (*diskv1beta1.AzVolumeStatusDetail, error) { - lister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace) + lister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace) volumeName, err := azureutils.GetDiskName(volumeID) if err != nil { @@ -775,7 +775,7 @@ func (c *CrdProvisioner) ExpandVolume( } azVolumeName := strings.ToLower(volumeName) - waiter, err := c.conditionWatcher.newConditionWaiter(ctx, azVolumeType, azVolumeName, func(obj interface{}, _ bool) (bool, error) { + waiter, err := c.conditionWatcher.NewConditionWaiter(ctx, watcher.AzVolumeType, azVolumeName, func(obj interface{}, _ bool) (bool, error) { if obj == nil { return false, nil } @@ -827,7 +827,7 @@ func (c *CrdProvisioner) ExpandVolume( return nil, status.Errorf(codes.Internal, "unexpected expand volume request: volume is currently in %s state", azVolume.Status.State) } - if err := azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.informerFactory, nil, c.azDiskClient, azVolume, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRI); err != nil { + if err := azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.InformerFactory(), nil, c.azDiskClient, azVolume, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRI); err != nil { return nil, err } @@ -856,7 +856,7 @@ func (c *CrdProvisioner) GetAzVolumeAttachment(ctx context.Context, volumeID str azVolumeAttachmentName := azureutils.GetAzVolumeAttachmentName(diskName, nodeID) var azVolumeAttachment *diskv1beta1.AzVolumeAttachment - if azVolumeAttachment, err = c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace).Get(azVolumeAttachmentName); err != nil { + if azVolumeAttachment, err = c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace).Get(azVolumeAttachmentName); err != nil { return nil, err } diff --git a/pkg/provisioner/crdprovisioner_test.go b/pkg/provisioner/crdprovisioner_test.go index 5d41efc1c5..fc5a898b3e 100644 --- a/pkg/provisioner/crdprovisioner_test.go +++ b/pkg/provisioner/crdprovisioner_test.go @@ -37,6 +37,7 @@ import ( azurediskInformers "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/client/informers/externalversions" consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants" "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils" + "sigs.k8s.io/azuredisk-csi-driver/pkg/watcher" ) const ( @@ -147,14 +148,14 @@ func NewTestCrdProvisioner(controller *gomock.Controller) *CrdProvisioner { return &CrdProvisioner{ azDiskClient: fakeDiskClient, namespace: testNameSpace, - conditionWatcher: newConditionWatcher(context.Background(), fakeDiskClient, informerFactory, testNameSpace), + conditionWatcher: watcher.NewConditionWatcher(context.Background(), fakeDiskClient, informerFactory, testNameSpace), } } func UpdateTestCrdProvisionerWithNewClient(provisioner *CrdProvisioner, azDiskClient azDiskClientSet.Interface) { informerFactory := azurediskInformers.NewSharedInformerFactory(azDiskClient, testResync) provisioner.azDiskClient = azDiskClient - provisioner.conditionWatcher = newConditionWatcher(context.Background(), azDiskClient, informerFactory, testNameSpace) + provisioner.conditionWatcher = watcher.NewConditionWatcher(context.Background(), azDiskClient, informerFactory, testNameSpace) } func TestCrdProvisionerCreateVolume(t *testing.T) { diff --git a/pkg/provisioner/conditionwaiter.go b/pkg/watcher/conditionwaiter.go similarity index 87% rename from pkg/provisioner/conditionwaiter.go rename to pkg/watcher/conditionwaiter.go index d752299d1e..4f1a261dac 100644 --- a/pkg/provisioner/conditionwaiter.go +++ b/pkg/watcher/conditionwaiter.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package provisioner +package watcher import ( "context" @@ -23,23 +23,23 @@ import ( "k8s.io/klog/v2" ) -type conditionWaiter struct { - objType objectType +type ConditionWaiter struct { + objType ObjectType objName string entry *waitEntry - watcher *conditionWatcher + watcher *ConditionWatcher } -func (w *conditionWaiter) Wait(ctx context.Context) (runtime.Object, error) { +func (w *ConditionWaiter) Wait(ctx context.Context) (runtime.Object, error) { var obj runtime.Object var err error namespace := w.watcher.namespace switch w.objType { - case azVolumeType: + case AzVolumeType: obj, err = w.watcher.informerFactory.Disk().V1beta1().AzVolumes().Lister().AzVolumes(namespace).Get(w.objName) - case azVolumeAttachmentType: + case AzVolumeAttachmentType: obj, err = w.watcher.informerFactory.Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(namespace).Get(w.objName) - case azDriverNodeType: + case AzDriverNodeType: obj, err = w.watcher.informerFactory.Disk().V1beta1().AzDriverNodes().Lister().AzDriverNodes(namespace).Get(w.objName) } @@ -63,6 +63,6 @@ func (w *conditionWaiter) Wait(ctx context.Context) (runtime.Object, error) { } } -func (w *conditionWaiter) Close() { +func (w *ConditionWaiter) Close() { w.watcher.waitMap.Delete(getTypedName(w.objType, w.objName)) } diff --git a/pkg/provisioner/conditionwatcher.go b/pkg/watcher/conditionwatcher.go similarity index 80% rename from pkg/provisioner/conditionwatcher.go rename to pkg/watcher/conditionwatcher.go index bdfc4d69f0..f0304b2c9b 100644 --- a/pkg/provisioner/conditionwatcher.go +++ b/pkg/watcher/conditionwatcher.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package provisioner +package watcher import ( "context" @@ -35,12 +35,12 @@ import ( azurediskInformers "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/client/informers/externalversions" ) -type objectType string +type ObjectType string const ( - azVolumeAttachmentType objectType = "azvolumeattachments" - azVolumeType objectType = "azvolume" - azDriverNodeType objectType = "azdrivernode" + AzVolumeAttachmentType ObjectType = "azvolumeattachments" + AzVolumeType ObjectType = "azvolume" + AzDriverNodeType ObjectType = "azdrivernode" ) type eventType int @@ -61,18 +61,18 @@ type waitEntry struct { waitChan chan waitResult } -type conditionWatcher struct { +type ConditionWatcher struct { informerFactory azurediskInformers.SharedInformerFactory waitMap sync.Map // maps namespaced name to waitEntry namespace string } -func newConditionWatcher(ctx context.Context, azDiskClient azDiskClientSet.Interface, informerFactory azurediskInformers.SharedInformerFactory, namespace string) *conditionWatcher { +func NewConditionWatcher(ctx context.Context, azDiskClient azDiskClientSet.Interface, informerFactory azurediskInformers.SharedInformerFactory, namespace string) *ConditionWatcher { azVolumeAttachmentInformer := informerFactory.Disk().V1beta1().AzVolumeAttachments().Informer() azVolumeInformer := informerFactory.Disk().V1beta1().AzVolumes().Informer() azDriverNodeInformer := informerFactory.Disk().V1beta1().AzDriverNodes().Informer() - c := conditionWatcher{ + c := ConditionWatcher{ informerFactory: informerFactory, waitMap: sync.Map{}, } @@ -104,7 +104,11 @@ func newConditionWatcher(ctx context.Context, azDiskClient azDiskClientSet.Inter return &c } -func (c *conditionWatcher) newConditionWaiter(ctx context.Context, objType objectType, objName string, conditionFunc func(obj interface{}, expectDelete bool) (bool, error)) (*conditionWaiter, error) { +func (c *ConditionWatcher) InformerFactory() azurediskInformers.SharedInformerFactory { + return c.informerFactory +} + +func (c *ConditionWatcher) NewConditionWaiter(ctx context.Context, objType ObjectType, objName string, conditionFunc func(obj interface{}, expectDelete bool) (bool, error)) (*ConditionWaiter, error) { klog.V(5).Infof("Adding a condition function for %s (%s)", objType, objName) entry := waitEntry{ conditionFunc: conditionFunc, @@ -118,7 +122,7 @@ func (c *conditionWatcher) newConditionWaiter(ctx context.Context, objType objec return nil, err } - return &conditionWaiter{ + return &ConditionWaiter{ objType: objType, objName: objName, entry: &entry, @@ -126,33 +130,33 @@ func (c *conditionWatcher) newConditionWaiter(ctx context.Context, objType objec }, nil } -func (c *conditionWatcher) onCreate(obj interface{}) { +func (c *ConditionWatcher) onCreate(obj interface{}) { c.handleEvent(obj, create) } -func (c *conditionWatcher) onUpdate(_, newObj interface{}) { +func (c *ConditionWatcher) onUpdate(_, newObj interface{}) { c.handleEvent(newObj, update) } -func (c *conditionWatcher) onDelete(obj interface{}) { +func (c *ConditionWatcher) onDelete(obj interface{}) { c.handleEvent(obj, delete) } -func (c *conditionWatcher) handleEvent(obj interface{}, eventType eventType) { +func (c *ConditionWatcher) handleEvent(obj interface{}, eventType eventType) { metaObj, err := meta.Accessor(obj) if err != nil { // this line should not be reached klog.Errorf("object (%v) has not implemented meta object interface.") } - var objType objectType + var objType ObjectType switch obj.(type) { case *diskv1beta1.AzVolume: - objType = azVolumeType + objType = AzVolumeType case *diskv1beta1.AzVolumeAttachment: - objType = azVolumeAttachmentType + objType = AzVolumeAttachmentType case *diskv1beta1.AzDriverNode: - objType = azDriverNodeType + objType = AzDriverNodeType default: // unknown object type klog.Errorf("unsupported object type %v", reflect.TypeOf(obj)) @@ -193,6 +197,6 @@ func (c *conditionWatcher) handleEvent(obj interface{}, eventType eventType) { } } -func getTypedName(objType objectType, objName string) string { +func getTypedName(objType ObjectType, objName string) string { return fmt.Sprintf("%s/%s", string(objType), objName) }