diff --git a/pkg/azureconstants/azure_constants.go b/pkg/azureconstants/azure_constants.go index a442b23204..6061c27d8c 100644 --- a/pkg/azureconstants/azure_constants.go +++ b/pkg/azureconstants/azure_constants.go @@ -131,6 +131,7 @@ const ( CRIUpdateRetryDuration = time.Duration(1) * time.Second CRIUpdateRetryFactor = 3.0 CRIUpdateRetryStep = 5 + DefaultInformerResync = time.Duration(30) * time.Second ZonedField = "zoned" NormalUpdateMaxNetRetry = 0 ForcedUpdateMaxNetRetry = 10 diff --git a/pkg/controller/azvolume.go b/pkg/controller/azvolume.go index 6accc5f5f9..a14e0c07f4 100644 --- a/pkg/controller/azvolume.go +++ b/pkg/controller/azvolume.go @@ -34,6 +34,7 @@ import ( diskv1beta1 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta1" "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils" util "sigs.k8s.io/azuredisk-csi-driver/pkg/util" + "sigs.k8s.io/azuredisk-csi-driver/pkg/watcher" "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow" consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants" @@ -226,21 +227,9 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *diskv1b } }() - // Delete all AzVolumeAttachment objects bound to the deleted AzVolume - var attachments []diskv1beta1.AzVolumeAttachment - attachments, err = r.controllerSharedState.cleanUpAzVolumeAttachmentByVolume(ctx, azVolume.Name, azvolume, all, mode) - if err != nil { - return err - } - - if len(attachments) > 0 { - err = status.Errorf(codes.Aborted, "volume deletion requeued until attached azVolumeAttachments are entirely detached...") - return err - } - // only try deleting underlying volume 1) if volume creation was successful and 2) volumeDeleteRequestAnnotation is present // if the annotation is not present, only delete the CRI and not the underlying volume - if isCreated(azVolume) && volumeDeleteRequested { + if isCreated(azVolume) && mode == detachAndDeleteCRI { // requeue if AzVolume's state is being updated by a different worker defer r.stateLock.Delete(azVolume.Name) if _, ok := r.stateLock.LoadOrStore(azVolume.Name, nil); ok { @@ -256,7 +245,10 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *diskv1b return err } - w.Logger().Info("Deleting Volume...") + if volumeDeleteRequested { + w.Logger().Info("Deleting Volume...") + } + waitCh := make(chan goSignal) //nolint:contextcheck // call is asynchronous; context is not inherited by design go func() { @@ -266,29 +258,101 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *diskv1b waitCh <- goSignal{} goCtx := goWorkflow.SaveToContext(context.Background()) - cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout) - defer cloudCancel() + + deleteCtx, deleteCancel := context.WithTimeout(goCtx, cloudTimeout) + defer deleteCancel() + + reportError := func(obj interface{}, err error) error { + azv := obj.(*diskv1beta1.AzVolume) + _ = r.updateError(azv, err) + _, derr := r.updateState(azv, diskv1beta1.VolumeDeletionFailed, forceUpdate) + return derr + } var updateFunc func(interface{}) error + var err error updateMode := azureutils.UpdateCRIStatus - deleteErr = r.deleteVolume(cloudCtx, azVolume) - if deleteErr != nil { + + // Delete all AzVolumeAttachment objects bound to the deleted AzVolume + var attachments []diskv1beta1.AzVolumeAttachment + attachments, err = r.controllerSharedState.cleanUpAzVolumeAttachmentByVolume(deleteCtx, azVolume.Name, azvolume, all, mode) + if err != nil { updateFunc = func(obj interface{}) error { - azv := obj.(*diskv1beta1.AzVolume) - azv = r.updateError(azv, deleteErr) - _, derr := r.updateState(azv, diskv1beta1.VolumeDeletionFailed, forceUpdate) - return derr + return reportError(obj, err) } } else { - updateFunc = func(obj interface{}) error { - azv := obj.(*diskv1beta1.AzVolume) - azv = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true}) - _, derr := r.updateState(azv, diskv1beta1.VolumeDeleted, forceUpdate) - return derr + var wg sync.WaitGroup + errors := make([]error, len(attachments)) + numErrors := uint32(0) + + // start waiting for replica AzVolumeAttachment CRIs to be deleted + for i, attachment := range attachments { + waiter, err := r.controllerSharedState.conditionWatcher.NewConditionWaiter(deleteCtx, watcher.AzVolumeAttachmentType, attachment.Name, verifyObjectDeleted) + if err != nil { + updateFunc = func(obj interface{}) error { + return reportError(obj, err) + } + break + } + + // wait async and report error to go channel + wg.Add(1) + go func(ctx context.Context, waiter *watcher.ConditionWaiter, i int) { + defer waiter.Close() + defer wg.Done() + _, err := waiter.Wait(ctx) + if err != nil { + errors[i] = err + atomic.AddUint32(&numErrors, 1) + } + }(deleteCtx, waiter, i) } - updateMode = azureutils.UpdateAll + wg.Wait() + + // if errors have been found with the wait calls, format the error msg and report via CRI + if numErrors > 0 { + var errMsgs []string + for i, derr := range errors { + if derr != nil { + errMsgs = append(errMsgs, fmt.Sprintf("%s: %v", attachments[i].Name, derr)) + } + } + err = status.Errorf(codes.Internal, strings.Join(errMsgs, ", ")) + updateFunc = func(obj interface{}) error { + return reportError(obj, err) + } + } } + + if err == nil { + if volumeDeleteRequested { + cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout) + defer cloudCancel() + + deleteErr = r.deleteVolume(cloudCtx, azVolume) + } + if deleteErr != nil { + updateFunc = func(obj interface{}) error { + azv := obj.(*diskv1beta1.AzVolume) + azv = r.updateError(azv, deleteErr) + _, derr := r.updateState(azv, diskv1beta1.VolumeDeletionFailed, forceUpdate) + return derr + } + } else { + updateMode = azureutils.UpdateAll + updateFunc = func(obj interface{}) error { + azv := obj.(*diskv1beta1.AzVolume) + azv = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true}) + var derr error + if volumeDeleteRequested { + _, derr = r.updateState(azv, diskv1beta1.VolumeDeleted, forceUpdate) + } + return derr + } + } + } + // UpdateCRIWithRetry should be called on a context w/o timeout when called in a separate goroutine as it is not going to be retriggered and leave the CRI in unrecoverable transient state instead. _ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode) }() diff --git a/pkg/controller/azvolume_test.go b/pkg/controller/azvolume_test.go index 1549c9df4d..b221764123 100644 --- a/pkg/controller/azvolume_test.go +++ b/pkg/controller/azvolume_test.go @@ -20,12 +20,14 @@ import ( "context" "sync" "testing" - "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" fakev1 "k8s.io/client-go/kubernetes/fake" "k8s.io/klog/v2/klogr" @@ -35,6 +37,7 @@ import ( "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils" "sigs.k8s.io/azuredisk-csi-driver/pkg/controller/mockclient" "sigs.k8s.io/azuredisk-csi-driver/pkg/controller/mockvolumeprovisioner" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -218,7 +221,7 @@ func TestAzVolumeControllerReconcile(t *testing.T) { }, }, { - description: "[Failure] Should delete volume attachments and requeue when AzVolume is marked for deletion.", + description: "[Success] Should delete replica volume attachments and delete AzVolume respectively", request: testAzVolume0Request, setupFunc: func(t *testing.T, mockCtl *gomock.Controller) *ReconcileAzVolume { azVolume := testAzVolume0.DeepCopy() @@ -239,7 +242,6 @@ func TestAzVolumeControllerReconcile(t *testing.T) { mockCtl, testNamespace, azVolume, - &testPrimaryAzVolumeAttachment0, &testReplicaAzVolumeAttachment) mockClientsAndVolumeProvisioner(controller) @@ -248,14 +250,23 @@ func TestAzVolumeControllerReconcile(t *testing.T) { }, verifyFunc: func(t *testing.T, controller *ReconcileAzVolume, result reconcile.Result, err error) { require.NoError(t, err) - require.Greater(t, result.RequeueAfter, time.Duration(0)) - - azVolume, err := controller.controllerSharedState.azClient.DiskV1beta1().AzVolumes(testNamespace).Get(context.TODO(), testPersistentVolume0Name, metav1.GetOptions{}) + req, err := azureutils.CreateLabelRequirements(consts.VolumeNameLabel, selection.Equals, testPersistentVolume0Name) + require.NoError(t, err) + labelSelector := labels.NewSelector().Add(*req) + checkAzVolumeAttachmentDeletion := func() (bool, error) { + var attachments diskv1beta1.AzVolumeAttachmentList + err := controller.controllerSharedState.cachedClient.List(context.Background(), &attachments, &client.ListOptions{LabelSelector: labelSelector}) + return len(attachments.Items) == 0, err + } + err = wait.PollImmediate(verifyCRIInterval, verifyCRITimeout, checkAzVolumeAttachmentDeletion) + require.NoError(t, err) + checkAzVolumeDeletion := func() (bool, error) { + var azVolume diskv1beta1.AzVolume + err := controller.controllerSharedState.cachedClient.Get(context.Background(), types.NamespacedName{Namespace: controller.controllerSharedState.objectNamespace, Name: testPersistentVolume0Name}, &azVolume) + return azVolume.Status.State == diskv1beta1.VolumeDeleted, err + } + err = wait.PollImmediate(verifyCRIInterval, verifyCRITimeout, checkAzVolumeDeletion) require.NoError(t, err) - require.Equal(t, diskv1beta1.VolumeCreated, azVolume.Status.State) - - azVolumeAttachments, _ := controller.controllerSharedState.azClient.DiskV1beta1().AzVolumeAttachments(testNamespace).List(context.TODO(), metav1.ListOptions{}) - require.Len(t, azVolumeAttachments.Items, 0) }, }, } diff --git a/pkg/controller/common.go b/pkg/controller/common.go index e686efb9db..58f3d51b2a 100644 --- a/pkg/controller/common.go +++ b/pkg/controller/common.go @@ -43,8 +43,11 @@ import ( "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" diskv1beta1 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta1" azClientSet "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/client/clientset/versioned" + azurediskInformers "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/client/informers/externalversions" consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants" "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils" + "sigs.k8s.io/azuredisk-csi-driver/pkg/util" + "sigs.k8s.io/azuredisk-csi-driver/pkg/watcher" "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -269,10 +272,11 @@ type SharedState struct { cachedClient client.Client azClient azClientSet.Interface kubeClient kubernetes.Interface + conditionWatcher *watcher.ConditionWatcher } func NewSharedState(driverName, objectNamespace, topologyKey string, eventRecorder record.EventRecorder, cachedClient client.Client, azClient azClientSet.Interface, kubeClient kubernetes.Interface) *SharedState { - newSharedState := &SharedState{driverName: driverName, objectNamespace: objectNamespace, topologyKey: topologyKey, eventRecorder: eventRecorder, cachedClient: cachedClient, azClient: azClient, kubeClient: kubeClient} + newSharedState := &SharedState{driverName: driverName, objectNamespace: objectNamespace, topologyKey: topologyKey, eventRecorder: eventRecorder, cachedClient: cachedClient, azClient: azClient, kubeClient: kubeClient, conditionWatcher: watcher.New(context.Background(), azClient, azurediskInformers.NewSharedInformerFactory(azClient, consts.DefaultInformerResync), objectNamespace)} newSharedState.createReplicaRequestsQueue() return newSharedState } @@ -2178,3 +2182,16 @@ func (c *SharedState) getNodesForReplica(ctx context.Context, volumeName string, return filtered, nil } + +func verifyObjectDeleted(obj interface{}, objectDeleted bool) (bool, error) { + if obj == nil || objectDeleted { + return true, nil + } + + // otherwise, the volume detachment has either failed with error or pending + azVolumeAttachmentInstance := obj.(*diskv1beta1.AzVolumeAttachment) + if azVolumeAttachmentInstance.Status.Error != nil { + return false, util.ErrorFromAzError(azVolumeAttachmentInstance.Status.Error) + } + return false, nil +} diff --git a/pkg/controller/replica.go b/pkg/controller/replica.go index ca5faa0293..9d27193b1e 100644 --- a/pkg/controller/replica.go +++ b/pkg/controller/replica.go @@ -29,6 +29,7 @@ import ( diskv1beta1 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta1" consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants" "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils" + "sigs.k8s.io/azuredisk-csi-driver/pkg/watcher" "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -104,16 +105,9 @@ func (r *ReconcileReplica) Reconcile(ctx context.Context, request reconcile.Requ goCtx := context.Background() // wait for replica AzVolumeAttachment deletion - conditionFunc := func() (bool, error) { - var tmp diskv1beta1.AzVolumeAttachment - err := r.controllerSharedState.cachedClient.Get(goCtx, request.NamespacedName, &tmp) - if errors.IsNotFound(err) { - return true, nil - } - - return false, err - } - _ = wait.PollImmediateInfinite(deletionPollingInterval, conditionFunc) + waiter, _ := r.controllerSharedState.conditionWatcher.NewConditionWaiter(goCtx, watcher.AzVolumeAttachmentType, azVolumeAttachment.Name, verifyObjectDeleted) + defer waiter.Close() + _, _ = waiter.Wait(goCtx) // add replica management operation to the queue r.triggerManageReplica(goCtx, azVolumeAttachment.Spec.VolumeName) diff --git a/pkg/provisioner/crdprovisioner.go b/pkg/provisioner/crdprovisioner.go index 103ef231af..14b99efe90 100644 --- a/pkg/provisioner/crdprovisioner.go +++ b/pkg/provisioner/crdprovisioner.go @@ -43,19 +43,19 @@ import ( consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants" "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils" "sigs.k8s.io/azuredisk-csi-driver/pkg/util" + "sigs.k8s.io/azuredisk-csi-driver/pkg/watcher" "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow" ) type CrdProvisioner struct { azDiskClient azDiskClientSet.Interface namespace string - conditionWatcher *conditionWatcher + conditionWatcher *watcher.ConditionWatcher } const ( // TODO: Figure out good interval and timeout values, and make them configurable. - interval = time.Duration(1) * time.Second - informerResync = time.Duration(30) * time.Second + interval = time.Duration(1) * time.Second ) func NewCrdProvisioner(kubeConfig *rest.Config, objNamespace string) (*CrdProvisioner, error) { @@ -64,12 +64,12 @@ func NewCrdProvisioner(kubeConfig *rest.Config, objNamespace string) (*CrdProvis return nil, err } - informerFactory := azurediskInformers.NewSharedInformerFactory(diskClient, informerResync) + informerFactory := azurediskInformers.NewSharedInformerFactory(diskClient, consts.DefaultInformerResync) return &CrdProvisioner{ azDiskClient: diskClient, namespace: objNamespace, - conditionWatcher: newConditionWatcher(context.Background(), diskClient, informerFactory, objNamespace), + conditionWatcher: watcher.New(context.Background(), diskClient, informerFactory, objNamespace), }, nil } @@ -146,7 +146,7 @@ func (c *CrdProvisioner) CreateVolume( volumeContentSource *diskv1beta1.ContentVolumeSource, accessibilityReq *diskv1beta1.TopologyRequirement) (*diskv1beta1.AzVolumeStatusDetail, error) { var err error - azVLister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace) + azVLister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace) azVolumeClient := c.azDiskClient.DiskV1beta1().AzVolumes(c.namespace) _, maxMountReplicaCount := azureutils.GetMaxSharesAndMaxMountReplicaCount(parameters, azureutils.HasMultiNodeAzVolumeCapabilityAccessMode(volumeCapabilities)) @@ -198,7 +198,7 @@ func (c *CrdProvisioner) CreateVolume( return nil } - if err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.informerFactory, nil, c.azDiskClient, azVolumeInstance, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateAll); err != nil { + if err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.InformerFactory(), nil, c.azDiskClient, azVolumeInstance, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateAll); err != nil { return nil, err } // if the error was caused by errors other than IsNotFound, return failure @@ -240,9 +240,9 @@ func (c *CrdProvisioner) CreateVolume( w.Logger().V(5).Info("Successfully created AzVolume CRI") } - var waiter *conditionWaiter - waiter, err = c.conditionWatcher.newConditionWaiter(ctx, azVolumeType, azVolumeName, func(obj interface{}, _ bool) (bool, error) { - if obj == nil { + var waiter *watcher.ConditionWaiter + waiter, err = c.conditionWatcher.NewConditionWaiter(ctx, watcher.AzVolumeType, azVolumeName, func(obj interface{}, objectDeleted bool) (bool, error) { + if obj == nil || objectDeleted { return false, nil } azVolumeInstance := obj.(*diskv1beta1.AzVolume) @@ -298,7 +298,7 @@ func (c *CrdProvisioner) DeleteVolume(ctx context.Context, volumeID string, secr // snapshot APIs through the CRD provisioner. // Replace them in all instances in this file. var err error - lister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace) + lister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace) azVolumeClient := c.azDiskClient.DiskV1beta1().AzVolumes(c.namespace) var volumeName string @@ -323,8 +323,8 @@ func (c *CrdProvisioner) DeleteVolume(ctx context.Context, volumeID string, secr return err } - var waiter *conditionWaiter - waiter, err = c.conditionWatcher.newConditionWaiter(ctx, azVolumeType, azVolumeName, func(obj interface{}, objectDeleted bool) (bool, error) { + var waiter *watcher.ConditionWaiter + waiter, err = c.conditionWatcher.NewConditionWaiter(ctx, watcher.AzVolumeType, azVolumeName, func(obj interface{}, objectDeleted bool) (bool, error) { // if no object is found, object is deleted if obj == nil || objectDeleted { return true, nil @@ -369,7 +369,7 @@ func (c *CrdProvisioner) DeleteVolume(ctx context.Context, volumeID string, secr } // update AzVolume CRI with annotation and reset state with retry upon conflict - if err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.informerFactory, nil, c.azDiskClient, azVolumeInstance, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil { + if err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.InformerFactory(), nil, c.azDiskClient, azVolumeInstance, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil { return err } @@ -400,9 +400,9 @@ func (c *CrdProvisioner) PublishVolume( volumeContext map[string]string, ) (map[string]string, error) { var err error - azVALister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace) - nodeLister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzDriverNodes().Lister().AzDriverNodes(c.namespace) - volumeLister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace) + azVALister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace) + nodeLister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzDriverNodes().Lister().AzDriverNodes(c.namespace) + volumeLister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace) azVAClient := c.azDiskClient.DiskV1beta1().AzVolumeAttachments(c.namespace) var volumeName string @@ -562,7 +562,7 @@ func (c *CrdProvisioner) PublishVolume( } updateMode = azureutils.UpdateAll } - err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.informerFactory, nil, c.azDiskClient, attachmentObj, updateFunc, consts.NormalUpdateMaxNetRetry, updateMode) + err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.InformerFactory(), nil, c.azDiskClient, attachmentObj, updateFunc, consts.NormalUpdateMaxNetRetry, updateMode) return publishContext, err } @@ -571,7 +571,7 @@ func (c *CrdProvisioner) WaitForAttach(ctx context.Context, volumeID, nodeID str ctx, w := workflow.New(ctx) defer func() { w.Finish(err) }() - lister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace) + lister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace) var volumeName string volumeName, err = azureutils.GetDiskName(volumeID) @@ -587,9 +587,9 @@ func (c *CrdProvisioner) WaitForAttach(ctx context.Context, volumeID, nodeID str return nil, err } - var waiter *conditionWaiter - waiter, err = c.conditionWatcher.newConditionWaiter(ctx, azVolumeAttachmentType, attachmentName, func(obj interface{}, _ bool) (bool, error) { - if obj == nil { + var waiter *watcher.ConditionWaiter + waiter, err = c.conditionWatcher.NewConditionWaiter(ctx, watcher.AzVolumeAttachmentType, attachmentName, func(obj interface{}, objectDeleted bool) (bool, error) { + if obj == nil || objectDeleted { return false, nil } azVolumeAttachmentInstance := obj.(*diskv1beta1.AzVolumeAttachment) @@ -620,7 +620,7 @@ func (c *CrdProvisioner) WaitForAttach(ctx context.Context, volumeID, nodeID str updateInstance.Status.Error = nil return nil } - if err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.informerFactory, nil, c.azDiskClient, azVolumeAttachmentInstance, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil { + if err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.InformerFactory(), nil, c.azDiskClient, azVolumeAttachmentInstance, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil { return nil, err } } @@ -649,7 +649,7 @@ func (c *CrdProvisioner) UnpublishVolume( mode consts.UnpublishMode) error { var err error azVAClient := c.azDiskClient.DiskV1beta1().AzVolumeAttachments(c.namespace) - azVALister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace) + azVALister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace) var volumeName string volumeName, err = azureutils.GetDiskName(volumeID) @@ -688,7 +688,7 @@ func (c *CrdProvisioner) shouldDemote(volumeName string, mode consts.UnpublishMo if mode == consts.Detach { return false, nil } - azVolumeInstance, err := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace).Get(volumeName) + azVolumeInstance, err := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace).Get(volumeName) if err != nil { return false, err } @@ -714,7 +714,7 @@ func (c *CrdProvisioner) demoteVolume(ctx context.Context, azVAClient v1beta1.Az updateInstance.Labels[consts.RoleChangeLabel] = consts.Demoted return nil } - return azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.informerFactory, nil, c.azDiskClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateAll) + return azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.InformerFactory(), nil, c.azDiskClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateAll) } func (c *CrdProvisioner) detachVolume(ctx context.Context, azVAClient v1beta1.AzVolumeAttachmentInterface, azVolumeAttachment *diskv1beta1.AzVolumeAttachment) error { @@ -765,7 +765,7 @@ func (c *CrdProvisioner) detachVolume(ctx context.Context, azVAClient v1beta1.Az w.Logger().V(5).Infof("Requesting AzVolumeAttachment (%s) detachment", azVolumeAttachment.Name) - if err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.informerFactory, nil, c.azDiskClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil { + if err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.InformerFactory(), nil, c.azDiskClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil { return err } @@ -796,10 +796,10 @@ func (c *CrdProvisioner) WaitForDetach(ctx context.Context, volumeID, nodeID str volumeName = strings.ToLower(volumeName) attachmentName := azureutils.GetAzVolumeAttachmentName(volumeName, nodeID) - lister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace) + lister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace) - var waiter *conditionWaiter - waiter, err = c.conditionWatcher.newConditionWaiter(ctx, azVolumeAttachmentType, attachmentName, func(obj interface{}, objectDeleted bool) (bool, error) { + var waiter *watcher.ConditionWaiter + waiter, err = c.conditionWatcher.NewConditionWaiter(ctx, watcher.AzVolumeAttachmentType, attachmentName, func(obj interface{}, objectDeleted bool) (bool, error) { // if no object is found, return if obj == nil || objectDeleted { return true, nil @@ -832,7 +832,7 @@ func (c *CrdProvisioner) ExpandVolume( capacityRange *diskv1beta1.CapacityRange, secrets map[string]string) (*diskv1beta1.AzVolumeStatusDetail, error) { var err error - lister := c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace) + lister := c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumes().Lister().AzVolumes(c.namespace) volumeName, err := azureutils.GetDiskName(volumeID) if err != nil { @@ -849,9 +849,9 @@ func (c *CrdProvisioner) ExpandVolume( ctx, w := workflow.New(ctx, workflow.WithDetails(workflow.GetObjectDetails(azVolume)...)) defer func() { w.Finish(err) }() - var waiter *conditionWaiter - waiter, err = c.conditionWatcher.newConditionWaiter(ctx, azVolumeType, azVolumeName, func(obj interface{}, _ bool) (bool, error) { - if obj == nil { + var waiter *watcher.ConditionWaiter + waiter, err = c.conditionWatcher.NewConditionWaiter(ctx, watcher.AzVolumeType, azVolumeName, func(obj interface{}, objectDeleted bool) (bool, error) { + if obj == nil || objectDeleted { return false, nil } azVolumeInstance := obj.(*diskv1beta1.AzVolume) @@ -902,7 +902,7 @@ func (c *CrdProvisioner) ExpandVolume( return nil, err } - if err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.informerFactory, nil, c.azDiskClient, azVolume, updateFunc, consts.NormalUpdateMaxNetRetry, updateMode); err != nil { + if err = azureutils.UpdateCRIWithRetry(ctx, c.conditionWatcher.InformerFactory(), nil, c.azDiskClient, azVolume, updateFunc, consts.NormalUpdateMaxNetRetry, updateMode); err != nil { return nil, err } @@ -934,7 +934,7 @@ func (c *CrdProvisioner) GetAzVolumeAttachment(ctx context.Context, volumeID str azVolumeAttachmentName := azureutils.GetAzVolumeAttachmentName(diskName, nodeID) var azVolumeAttachment *diskv1beta1.AzVolumeAttachment - if azVolumeAttachment, err = c.conditionWatcher.informerFactory.Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace).Get(azVolumeAttachmentName); err != nil { + if azVolumeAttachment, err = c.conditionWatcher.InformerFactory().Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(c.namespace).Get(azVolumeAttachmentName); err != nil { return nil, err } diff --git a/pkg/provisioner/crdprovisioner_test.go b/pkg/provisioner/crdprovisioner_test.go index d74856e95f..97bd1b7610 100644 --- a/pkg/provisioner/crdprovisioner_test.go +++ b/pkg/provisioner/crdprovisioner_test.go @@ -37,6 +37,7 @@ import ( azurediskInformers "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/client/informers/externalversions" consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants" "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils" + "sigs.k8s.io/azuredisk-csi-driver/pkg/watcher" ) const ( @@ -147,14 +148,14 @@ func NewTestCrdProvisioner(controller *gomock.Controller) *CrdProvisioner { return &CrdProvisioner{ azDiskClient: fakeDiskClient, namespace: testNameSpace, - conditionWatcher: newConditionWatcher(context.Background(), fakeDiskClient, informerFactory, testNameSpace), + conditionWatcher: watcher.New(context.Background(), fakeDiskClient, informerFactory, testNameSpace), } } func UpdateTestCrdProvisionerWithNewClient(provisioner *CrdProvisioner, azDiskClient azDiskClientSet.Interface) { informerFactory := azurediskInformers.NewSharedInformerFactory(azDiskClient, testResync) provisioner.azDiskClient = azDiskClient - provisioner.conditionWatcher = newConditionWatcher(context.Background(), azDiskClient, informerFactory, testNameSpace) + provisioner.conditionWatcher = watcher.New(context.Background(), azDiskClient, informerFactory, testNameSpace) } func TestCrdProvisionerCreateVolume(t *testing.T) { diff --git a/pkg/provisioner/conditionwaiter.go b/pkg/watcher/conditionwaiter.go similarity index 81% rename from pkg/provisioner/conditionwaiter.go rename to pkg/watcher/conditionwaiter.go index efb2652d8a..437b42695f 100644 --- a/pkg/provisioner/conditionwaiter.go +++ b/pkg/watcher/conditionwaiter.go @@ -14,48 +14,49 @@ See the License for the specific language governing permissions and limitations under the License. */ -package provisioner +package watcher import ( "context" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow" ) -type conditionWaiter struct { - objType objectType +type ConditionWaiter struct { + objType ObjectType objName string entry *waitEntry - watcher *conditionWatcher + watcher *ConditionWatcher } -func (w *conditionWaiter) Wait(ctx context.Context) (runtime.Object, error) { +func (w *ConditionWaiter) Wait(ctx context.Context) (runtime.Object, error) { var obj runtime.Object var err error namespace := w.watcher.namespace switch w.objType { - case azVolumeType: + case AzVolumeType: obj, err = w.watcher.informerFactory.Disk().V1beta1().AzVolumes().Lister().AzVolumes(namespace).Get(w.objName) - case azVolumeAttachmentType: + case AzVolumeAttachmentType: obj, err = w.watcher.informerFactory.Disk().V1beta1().AzVolumeAttachments().Lister().AzVolumeAttachments(namespace).Get(w.objName) - case azDriverNodeType: + case AzDriverNodeType: obj, err = w.watcher.informerFactory.Disk().V1beta1().AzDriverNodes().Lister().AzDriverNodes(namespace).Get(w.objName) } + _, wf := workflow.New(ctx, workflow.WithDetails(workflow.GetObjectDetails(obj)...)) + defer func() { wf.Finish(err) }() + // if there exists an object in cache, evaluate condition function on it // if condition function returns error, the error could be coming from stale cache, so wait for another condition assessment from event handler. - if err == nil { - success, _ := w.entry.conditionFunc(obj, false) + if notFound := errors.IsNotFound(err); err == nil || notFound { + success, _ := w.entry.conditionFunc(obj, notFound) if success { return obj, nil } } - _, wf := workflow.New(ctx, workflow.WithDetails(workflow.GetObjectDetails(obj)...)) - defer func() { wf.Finish(err) }() - // if not wait for the event handler signal select { case <-ctx.Done(): @@ -67,6 +68,6 @@ func (w *conditionWaiter) Wait(ctx context.Context) (runtime.Object, error) { } } -func (w *conditionWaiter) Close() { +func (w *ConditionWaiter) Close() { w.watcher.waitMap.Delete(getTypedName(w.objType, w.objName)) } diff --git a/pkg/provisioner/conditionwatcher.go b/pkg/watcher/conditionwatcher.go similarity index 79% rename from pkg/provisioner/conditionwatcher.go rename to pkg/watcher/conditionwatcher.go index bdfc4d69f0..d5a5cbf3a8 100644 --- a/pkg/provisioner/conditionwatcher.go +++ b/pkg/watcher/conditionwatcher.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package provisioner +package watcher import ( "context" @@ -35,12 +35,12 @@ import ( azurediskInformers "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/client/informers/externalversions" ) -type objectType string +type ObjectType string const ( - azVolumeAttachmentType objectType = "azvolumeattachments" - azVolumeType objectType = "azvolume" - azDriverNodeType objectType = "azdrivernode" + AzVolumeAttachmentType ObjectType = "azvolumeattachments" + AzVolumeType ObjectType = "azvolume" + AzDriverNodeType ObjectType = "azdrivernode" ) type eventType int @@ -61,20 +61,21 @@ type waitEntry struct { waitChan chan waitResult } -type conditionWatcher struct { +type ConditionWatcher struct { informerFactory azurediskInformers.SharedInformerFactory waitMap sync.Map // maps namespaced name to waitEntry namespace string } -func newConditionWatcher(ctx context.Context, azDiskClient azDiskClientSet.Interface, informerFactory azurediskInformers.SharedInformerFactory, namespace string) *conditionWatcher { +func New(ctx context.Context, azDiskClient azDiskClientSet.Interface, informerFactory azurediskInformers.SharedInformerFactory, namespace string) *ConditionWatcher { azVolumeAttachmentInformer := informerFactory.Disk().V1beta1().AzVolumeAttachments().Informer() azVolumeInformer := informerFactory.Disk().V1beta1().AzVolumes().Informer() azDriverNodeInformer := informerFactory.Disk().V1beta1().AzDriverNodes().Informer() - c := conditionWatcher{ + c := ConditionWatcher{ informerFactory: informerFactory, waitMap: sync.Map{}, + namespace: namespace, } azVolumeAttachmentInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -104,7 +105,11 @@ func newConditionWatcher(ctx context.Context, azDiskClient azDiskClientSet.Inter return &c } -func (c *conditionWatcher) newConditionWaiter(ctx context.Context, objType objectType, objName string, conditionFunc func(obj interface{}, expectDelete bool) (bool, error)) (*conditionWaiter, error) { +func (c *ConditionWatcher) InformerFactory() azurediskInformers.SharedInformerFactory { + return c.informerFactory +} + +func (c *ConditionWatcher) NewConditionWaiter(ctx context.Context, objType ObjectType, objName string, conditionFunc func(obj interface{}, expectDelete bool) (bool, error)) (*ConditionWaiter, error) { klog.V(5).Infof("Adding a condition function for %s (%s)", objType, objName) entry := waitEntry{ conditionFunc: conditionFunc, @@ -118,7 +123,7 @@ func (c *conditionWatcher) newConditionWaiter(ctx context.Context, objType objec return nil, err } - return &conditionWaiter{ + return &ConditionWaiter{ objType: objType, objName: objName, entry: &entry, @@ -126,33 +131,33 @@ func (c *conditionWatcher) newConditionWaiter(ctx context.Context, objType objec }, nil } -func (c *conditionWatcher) onCreate(obj interface{}) { +func (c *ConditionWatcher) onCreate(obj interface{}) { c.handleEvent(obj, create) } -func (c *conditionWatcher) onUpdate(_, newObj interface{}) { +func (c *ConditionWatcher) onUpdate(_, newObj interface{}) { c.handleEvent(newObj, update) } -func (c *conditionWatcher) onDelete(obj interface{}) { +func (c *ConditionWatcher) onDelete(obj interface{}) { c.handleEvent(obj, delete) } -func (c *conditionWatcher) handleEvent(obj interface{}, eventType eventType) { +func (c *ConditionWatcher) handleEvent(obj interface{}, eventType eventType) { metaObj, err := meta.Accessor(obj) if err != nil { // this line should not be reached klog.Errorf("object (%v) has not implemented meta object interface.") } - var objType objectType + var objType ObjectType switch obj.(type) { case *diskv1beta1.AzVolume: - objType = azVolumeType + objType = AzVolumeType case *diskv1beta1.AzVolumeAttachment: - objType = azVolumeAttachmentType + objType = AzVolumeAttachmentType case *diskv1beta1.AzDriverNode: - objType = azDriverNodeType + objType = AzDriverNodeType default: // unknown object type klog.Errorf("unsupported object type %v", reflect.TypeOf(obj)) @@ -193,6 +198,6 @@ func (c *conditionWatcher) handleEvent(obj interface{}, eventType eventType) { } } -func getTypedName(objType objectType, objName string) string { +func getTypedName(objType ObjectType, objName string) string { return fmt.Sprintf("%s/%s", string(objType), objName) }