From adc35f8239f226347fcb48ad34a4e3fb0107c74a Mon Sep 17 00:00:00 2001 From: Ed Reed Date: Sat, 25 Jun 2022 15:40:05 -0700 Subject: [PATCH 1/2] fix: Fix CRI update on error recovery --- pkg/azureutils/azure_disk_utils.go | 12 ++++++---- pkg/controller/attach_detach.go | 25 +++++++++++---------- pkg/controller/azvolume.go | 35 +++++++++++++++--------------- pkg/controller/pv.go | 11 +++++----- pkg/controller/replica.go | 2 +- pkg/controller/volumeattachment.go | 3 ++- pkg/provisioner/crdprovisioner.go | 23 ++++++++++---------- 7 files changed, 60 insertions(+), 51 deletions(-) diff --git a/pkg/azureutils/azure_disk_utils.go b/pkg/azureutils/azure_disk_utils.go index fcac5d3821..949a62b903 100644 --- a/pkg/azureutils/azure_disk_utils.go +++ b/pkg/azureutils/azure_disk_utils.go @@ -898,7 +898,9 @@ func GetAzVolumeAttachmentState(volumeAttachmentStatus storagev1.VolumeAttachmen } } -func UpdateCRIWithRetry(ctx context.Context, informerFactory azdiskinformers.SharedInformerFactory, cachedClient client.Client, azDiskClient azdisk.Interface, obj client.Object, updateFunc func(interface{}) error, maxNetRetry int, updateMode CRIUpdateMode) error { +type UpdateCRIFunc func(client.Object) error + +func UpdateCRIWithRetry(ctx context.Context, informerFactory azdiskinformers.SharedInformerFactory, cachedClient client.Client, azDiskClient azdisk.Interface, obj client.Object, updateFunc UpdateCRIFunc, maxNetRetry int, updateMode CRIUpdateMode) error { var err error objName := obj.GetName() ctx, w := workflow.New(ctx) @@ -1028,11 +1030,13 @@ func UpdateCRIWithRetry(ctx context.Context, informerFactory azdiskinformers.Sha return err } -func AppendToUpdateFunc(updateFunc, newFunc func(interface{}) error) func(interface{}) error { +func AppendToUpdateCRIFunc(updateFunc, newFunc UpdateCRIFunc) UpdateCRIFunc { if updateFunc != nil { innerFunc := updateFunc - return func(obj interface{}) error { - obj = innerFunc(obj) + return func(obj client.Object) error { + if err := innerFunc(obj); err != nil { + return err + } return newFunc(obj) } } diff --git a/pkg/controller/attach_detach.go b/pkg/controller/attach_detach.go index 4c3887f300..aa283073a2 100644 --- a/pkg/controller/attach_detach.go +++ b/pkg/controller/attach_detach.go @@ -38,6 +38,7 @@ import ( "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow" volerr "k8s.io/cloud-provider/volume/errors" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -158,7 +159,7 @@ func (r *ReconcileAttachDetach) triggerAttach(ctx context.Context, azVolumeAttac } // update status block - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolumeAttachment) // Update state to attaching, Initialize finalizer and add label to the object _, derr := updateState(azv, azdiskv1beta2.Attaching, normalUpdate) @@ -248,7 +249,7 @@ func (r *ReconcileAttachDetach) triggerAttach(ctx context.Context, azVolumeAttac } } - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolumeAttachment) azv = updateError(azv, attachErr) _, uerr := updateState(azv, azdiskv1beta2.AttachmentFailed, forceUpdate) @@ -264,7 +265,7 @@ func (r *ReconcileAttachDetach) triggerAttach(ctx context.Context, azVolumeAttac } } - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolumeAttachment) azv = updateStatusDetail(azv, publishCtx) var uerr error @@ -300,7 +301,7 @@ func (r *ReconcileAttachDetach) triggerDetach(ctx context.Context, azVolumeAttac return getOperationRequeueError("detach", azVolumeAttachment) } - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolumeAttachment) // Update state to detaching _, derr := updateState(azv, azdiskv1beta2.Detaching, normalUpdate) @@ -323,18 +324,18 @@ func (r *ReconcileAttachDetach) triggerDetach(ctx context.Context, azVolumeAttac cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout) defer cloudCancel() - var updateFunc func(obj interface{}) error + var updateFunc func(obj client.Object) error updateMode := azureutils.UpdateCRIStatus detachErr = r.detachVolume(cloudCtx, azVolumeAttachment.Spec.VolumeID, azVolumeAttachment.Spec.NodeName) if detachErr != nil { - updateFunc = func(obj interface{}) error { + updateFunc = func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolumeAttachment) azv = updateError(azv, detachErr) _, derr := updateState(azv, azdiskv1beta2.DetachmentFailed, forceUpdate) return derr } } else { - updateFunc = func(obj interface{}) error { + updateFunc = func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolumeAttachment) _ = r.deleteFinalizer(azv) return nil @@ -346,7 +347,7 @@ func (r *ReconcileAttachDetach) triggerDetach(ctx context.Context, azVolumeAttac }() <-waitCh } else { - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolumeAttachment) // delete finalizer _ = r.deleteFinalizer(azv) @@ -369,7 +370,7 @@ func (r *ReconcileAttachDetach) promote(ctx context.Context, azVolumeAttachment return err } // initialize metadata and update status block - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolumeAttachment) _ = updateRole(azv, azdiskv1beta2.PrimaryRole) return nil @@ -387,7 +388,7 @@ func (r *ReconcileAttachDetach) demote(ctx context.Context, azVolumeAttachment * w.Logger().Info("Demoting AzVolumeAttachment") // initialize metadata and update status block - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolumeAttachment) delete(azv.Status.Annotations, consts.VolumeAttachmentKey) _ = updateRole(azv, azdiskv1beta2.ReplicaRole) @@ -407,7 +408,7 @@ func (r *ReconcileAttachDetach) updateVolumeAttachmentWithResult(ctx context.Con return err } - vaUpdateFunc := func(obj interface{}) error { + vaUpdateFunc := func(obj client.Object) error { va := obj.(*storagev1.VolumeAttachment) if azVolumeAttachment.Status.Detail != nil { for key, value := range azVolumeAttachment.Status.Detail.PublishContext { @@ -661,7 +662,7 @@ func (r *ReconcileAttachDetach) recoverAzVolumeAttachment(ctx context.Context, r go func(azv azdiskv1beta2.AzVolumeAttachment, azvMap *sync.Map) { defer wg.Done() var targetState azdiskv1beta2.AzVolumeAttachmentAttachmentState - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { var err error azv := obj.(*azdiskv1beta2.AzVolumeAttachment) // add a recover annotation to the CRI so that reconciliation can be triggered for the CRI even if CRI's current state == target state diff --git a/pkg/controller/azvolume.go b/pkg/controller/azvolume.go index 507a861b88..f96c52096f 100644 --- a/pkg/controller/azvolume.go +++ b/pkg/controller/azvolume.go @@ -38,6 +38,7 @@ import ( "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow" consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -145,7 +146,7 @@ func (r *ReconcileAzVolume) triggerCreate(ctx context.Context, azVolume *azdiskv } // update state - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolume) _, err := r.updateState(azv, azdiskv1beta2.VolumeCreating, normalUpdate) return err @@ -169,12 +170,12 @@ func (r *ReconcileAzVolume) triggerCreate(ctx context.Context, azVolume *azdiskv cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout) defer cloudCancel() - var updateFunc func(interface{}) error + var updateFunc azureutils.UpdateCRIFunc var response *azdiskv1beta2.AzVolumeStatusDetail response, createErr = r.createVolume(cloudCtx, azVolume) updateMode := azureutils.UpdateCRIStatus if createErr != nil { - updateFunc = func(obj interface{}) error { + updateFunc = func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolume) azv = r.updateError(azv, createErr) azv = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true}) @@ -185,7 +186,7 @@ func (r *ReconcileAzVolume) triggerCreate(ctx context.Context, azVolume *azdiskv } else { // create operation queue for the volume r.controllerSharedState.createOperationQueue(azVolume.Name) - updateFunc = func(obj interface{}) error { + updateFunc = func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolume) if response == nil { return status.Errorf(codes.Internal, "non-nil AzVolumeStatusDetail expected but nil given") @@ -235,7 +236,7 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv if _, ok := r.stateLock.LoadOrStore(azVolume.Name, nil); ok { return getOperationRequeueError("delete", azVolume) } - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolume) _, derr := r.updateState(azv, azdiskv1beta2.VolumeDeleting, normalUpdate) return derr @@ -269,7 +270,7 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv return derr } - var updateFunc func(interface{}) error + var updateFunc azureutils.UpdateCRIFunc var err error updateMode := azureutils.UpdateCRIStatus @@ -277,7 +278,7 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv var attachments []azdiskv1beta2.AzVolumeAttachment attachments, err = r.controllerSharedState.cleanUpAzVolumeAttachmentByVolume(deleteCtx, azVolume.Name, azvolume, all, mode) if err != nil { - updateFunc = func(obj interface{}) error { + updateFunc = func(obj client.Object) error { return reportError(obj, err) } } else { @@ -289,7 +290,7 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv for i, attachment := range attachments { waiter, err := r.controllerSharedState.conditionWatcher.NewConditionWaiter(deleteCtx, watcher.AzVolumeAttachmentType, attachment.Name, verifyObjectDeleted) if err != nil { - updateFunc = func(obj interface{}) error { + updateFunc = func(obj client.Object) error { return reportError(obj, err) } break @@ -319,7 +320,7 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv } } err = status.Errorf(codes.Internal, strings.Join(errMsgs, ", ")) - updateFunc = func(obj interface{}) error { + updateFunc = func(obj client.Object) error { return reportError(obj, err) } } @@ -333,7 +334,7 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv deleteErr = r.deleteVolume(cloudCtx, azVolume) } if deleteErr != nil { - updateFunc = func(obj interface{}) error { + updateFunc = func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolume) azv = r.updateError(azv, deleteErr) _, derr := r.updateState(azv, azdiskv1beta2.VolumeDeletionFailed, forceUpdate) @@ -341,7 +342,7 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv } } else { updateMode = azureutils.UpdateAll - updateFunc = func(obj interface{}) error { + updateFunc = func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolume) _ = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true}) return nil @@ -354,7 +355,7 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv }() <-waitCh } else { - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolume) _ = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true}) return nil @@ -377,7 +378,7 @@ func (r *ReconcileAzVolume) triggerUpdate(ctx context.Context, azVolume *azdiskv err = getOperationRequeueError("update", azVolume) return err } - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolume) _, derr := r.updateState(azv, azdiskv1beta2.VolumeUpdating, normalUpdate) return derr @@ -399,18 +400,18 @@ func (r *ReconcileAzVolume) triggerUpdate(ctx context.Context, azVolume *azdiskv cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout) defer cloudCancel() - var updateFunc func(interface{}) error + var updateFunc azureutils.UpdateCRIFunc var response *azdiskv1beta2.AzVolumeStatusDetail response, updateErr = r.expandVolume(cloudCtx, azVolume) if updateErr != nil { - updateFunc = func(obj interface{}) error { + updateFunc = func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolume) azv = r.updateError(azv, updateErr) _, derr := r.updateState(azv, azdiskv1beta2.VolumeUpdateFailed, forceUpdate) return derr } } else { - updateFunc = func(obj interface{}) error { + updateFunc = func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolume) if response == nil { return status.Errorf(codes.Internal, "non-nil AzVolumeStatusDetail expected but nil given") @@ -555,7 +556,7 @@ func (r *ReconcileAzVolume) recoverAzVolume(ctx context.Context, recoveredAzVolu go func(azv azdiskv1beta2.AzVolume, azvMap *sync.Map) { defer wg.Done() var targetState azdiskv1beta2.AzVolumeState - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { var err error azv := obj.(*azdiskv1beta2.AzVolume) // add a recover annotation to the CRI so that reconciliation can be triggered for the CRI even if CRI's current state == target state diff --git a/pkg/controller/pv.go b/pkg/controller/pv.go index ad8eb0be67..d4fb68330a 100644 --- a/pkg/controller/pv.go +++ b/pkg/controller/pv.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils" "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -114,7 +115,7 @@ func (r *ReconcilePV) Reconcile(ctx context.Context, request reconcile.Request) } // AzVolume does exist and needs to be deleted // add annotation to mark AzVolumeAttachment cleanup - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolume) azv.Status.Annotations = azureutils.AddToMap(azv.Status.Annotations, consts.PreProvisionedVolumeCleanupAnnotation, "true") return nil @@ -156,10 +157,10 @@ func (r *ReconcilePV) Reconcile(ctx context.Context, request reconcile.Request) return reconcileReturnOnSuccess(pv.Name, r.controllerRetryInfo) } - var azVolumeUpdateFunc func(interface{}) error + var azVolumeUpdateFunc azureutils.UpdateCRIFunc if azVolume.Spec.PersistentVolume != pv.Name { - azVolumeUpdateFunc = func(obj interface{}) error { + azVolumeUpdateFunc = func(obj client.Object) error { azVolume := obj.(*azdiskv1beta2.AzVolume) azVolume.Spec.PersistentVolume = pv.Name return nil @@ -171,7 +172,7 @@ func (r *ReconcilePV) Reconcile(ctx context.Context, request reconcile.Request) pvcNamespace, pvcNamespaceLabelExists := azureutils.GetFromMap(azVolume.Labels, consts.PvcNamespaceLabel) if pv.Spec.ClaimRef != nil { if !pvcLabelExists || pv.Spec.ClaimRef.Name != pvcName || !pvcNamespaceLabelExists || pv.Spec.ClaimRef.Namespace != pvcNamespace { - azureutils.AppendToUpdateFunc(azVolumeUpdateFunc, func(obj interface{}) error { + azVolumeUpdateFunc = azureutils.AppendToUpdateCRIFunc(azVolumeUpdateFunc, func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolume) azv.Labels = azureutils.AddToMap(azv.Labels, consts.PvcNameLabel, pv.Spec.ClaimRef.Name, consts.PvcNamespaceLabel, pv.Spec.ClaimRef.Namespace) return nil @@ -179,7 +180,7 @@ func (r *ReconcilePV) Reconcile(ctx context.Context, request reconcile.Request) } } else { if pvcLabelExists || pvcNamespaceLabelExists { - azureutils.AppendToUpdateFunc(azVolumeUpdateFunc, func(obj interface{}) error { + azVolumeUpdateFunc = azureutils.AppendToUpdateCRIFunc(azVolumeUpdateFunc, func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolume) delete(azv.Labels, consts.PvcNameLabel) delete(azv.Labels, consts.PvcNamespaceLabel) diff --git a/pkg/controller/replica.go b/pkg/controller/replica.go index 7d9ea2b936..0aa7decb23 100644 --- a/pkg/controller/replica.go +++ b/pkg/controller/replica.go @@ -92,7 +92,7 @@ func (r *ReconcileReplica) Reconcile(ctx context.Context, request reconcile.Requ if objectDeletionRequested(azVolumeAttachment) { switch azVolumeAttachment.Status.State { case azdiskv1beta2.DetachmentFailed: - if err := azureutils.UpdateCRIWithRetry(ctx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolumeAttachment, func(obj interface{}) error { + if err := azureutils.UpdateCRIWithRetry(ctx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolumeAttachment, func(obj client.Object) error { azVolumeAttachment := obj.(*azdiskv1beta2.AzVolumeAttachment) _, err = updateState(azVolumeAttachment, azdiskv1beta2.ForceDetachPending, normalUpdate) return err diff --git a/pkg/controller/volumeattachment.go b/pkg/controller/volumeattachment.go index ddc10eb87c..fabbe8f321 100644 --- a/pkg/controller/volumeattachment.go +++ b/pkg/controller/volumeattachment.go @@ -30,6 +30,7 @@ import ( azdiskv1beta2 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta2" "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -90,7 +91,7 @@ func (r *ReconcileVolumeAttachment) Reconcile(ctx context.Context, request recon return reconcileReturnOnError(ctx, &volumeAttachment, "get azvolumeattachment", err, r.controllerRetryInfo) } - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { azv := obj.(*azdiskv1beta2.AzVolumeAttachment) // Update the annotation of AzVolumeAttachment with volumeAttachment name azv.Status.Annotations = azureutils.AddToMap(azv.Status.Annotations, consts.VolumeAttachmentKey, volumeAttachment.Name) diff --git a/pkg/provisioner/crdprovisioner.go b/pkg/provisioner/crdprovisioner.go index 2c254ceea6..80318f11f8 100644 --- a/pkg/provisioner/crdprovisioner.go +++ b/pkg/provisioner/crdprovisioner.go @@ -44,6 +44,7 @@ import ( "sigs.k8s.io/azuredisk-csi-driver/pkg/util" "sigs.k8s.io/azuredisk-csi-driver/pkg/watcher" "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow" + "sigs.k8s.io/controller-runtime/pkg/client" ) type CrdProvisioner struct { @@ -179,7 +180,7 @@ func (c *CrdProvisioner) CreateVolume( w.Logger().V(5).Info("Requeuing CreateVolume request") // otherwise requeue operation - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { updateInstance := obj.(*azdiskv1beta2.AzVolume) updateInstance.Status.Error = nil updateInstance.Status.State = azdiskv1beta2.VolumeOperationPending @@ -321,7 +322,7 @@ func (c *CrdProvisioner) DeleteVolume(ctx context.Context, volumeID string, secr } // if deletion failed requeue deletion - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { updateInstance := obj.(*azdiskv1beta2.AzVolume) w.AnnotateObject(updateInstance) updateInstance.Status.Annotations = azureutils.AddToMap(updateInstance.Status.Annotations, consts.VolumeDeleteRequestAnnotation, "cloud-delete-volume") @@ -514,7 +515,7 @@ func (c *CrdProvisioner) PublishVolume( return publishContext, err } - var updateFunc func(obj interface{}) error + var updateFunc func(obj client.Object) error updateMode := azureutils.UpdateCRIStatus // if attachment is scheduled for deletion, new attachment should only be created after the deletion if attachmentObj.DeletionTimestamp != nil { @@ -528,7 +529,7 @@ func (c *CrdProvisioner) PublishVolume( return publishContext, err } // if primary attachment failed with an error, and for whatever reason, controllerPublishVolume request was made instead of NodeStageVolume request, reset the error here if ever reached - updateFunc = func(obj interface{}) error { + updateFunc = func(obj client.Object) error { updateInstance := obj.(*azdiskv1beta2.AzVolumeAttachment) w.AnnotateObject(updateInstance) updateInstance.Status.State = azdiskv1beta2.AttachmentPending @@ -543,7 +544,7 @@ func (c *CrdProvisioner) PublishVolume( } // otherwise, there is a replica attachment for this volume-node pair, so promote it to primary and return success - updateFunc = func(obj interface{}) error { + updateFunc = func(obj client.Object) error { updateInstance := obj.(*azdiskv1beta2.AzVolumeAttachment) w.AnnotateObject(updateInstance) updateInstance.Spec.RequestedRole = azdiskv1beta2.PrimaryRole @@ -616,7 +617,7 @@ func (c *CrdProvisioner) waitForLunOrAttach(ctx context.Context, volumeID, nodeI return azVolumeAttachmentInstance, err } else if err != nil { // If the attachment had previously failed with an error, reset the error and state, and retrigger attach - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { updateInstance := obj.(*azdiskv1beta2.AzVolumeAttachment) updateInstance.Status.State = azdiskv1beta2.AttachmentPending updateInstance.Status.Error = nil @@ -710,7 +711,7 @@ func (c *CrdProvisioner) demoteVolume(ctx context.Context, azVolumeAttachment *a ctx, w := workflow.GetWorkflowFromObj(ctx, azVolumeAttachment) w.Logger().V(5).Infof("Requesting AzVolumeAttachment (%s) demotion", azVolumeAttachment.Name) - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { updateInstance := obj.(*azdiskv1beta2.AzVolumeAttachment) w.AnnotateObject(updateInstance) updateInstance.Spec.RequestedRole = azdiskv1beta2.ReplicaRole @@ -737,7 +738,7 @@ func (c *CrdProvisioner) detachVolume(ctx context.Context, azVolumeAttachment *a ctx, w := workflow.GetWorkflowFromObj(ctx, azVolumeAttachment) - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { updateInstance := obj.(*azdiskv1beta2.AzVolumeAttachment) if updateInstance.Annotations == nil { updateInstance.Annotations = map[string]string{} @@ -758,7 +759,7 @@ func (c *CrdProvisioner) detachVolume(ctx context.Context, azVolumeAttachment *a } case azdiskv1beta2.DetachmentFailed: // if detachment failed, reset error and state to retrigger operation - azureutils.AppendToUpdateFunc(updateFunc, func(obj interface{}) error { + updateFunc = azureutils.AppendToUpdateCRIFunc(updateFunc, func(obj client.Object) error { updateInstance := obj.(*azdiskv1beta2.AzVolumeAttachment) // remove detachment failure error from AzVolumeAttachment CRI to retrigger detachment @@ -865,7 +866,7 @@ func (c *CrdProvisioner) ExpandVolume( defer waiter.Close() - updateFunc := func(obj interface{}) error { + updateFunc := func(obj client.Object) error { updateInstance := obj.(*azdiskv1beta2.AzVolume) w.AnnotateObject(updateInstance) updateInstance.Spec.CapacityRange = capacityRange @@ -879,7 +880,7 @@ func (c *CrdProvisioner) ExpandVolume( err = status.Errorf(codes.Aborted, "expand operation still in process") return nil, err case azdiskv1beta2.VolumeUpdateFailed: - azureutils.AppendToUpdateFunc(updateFunc, func(obj interface{}) error { + updateFunc = azureutils.AppendToUpdateCRIFunc(updateFunc, func(obj client.Object) error { updateInstance := obj.(*azdiskv1beta2.AzVolume) updateInstance.Status.Error = nil updateInstance.Status.State = azdiskv1beta2.VolumeCreated From f858b4dc2cb61c87cb07d50e6b73975db5d0455f Mon Sep 17 00:00:00 2001 From: Ed Reed Date: Sat, 25 Jun 2022 19:01:11 -0700 Subject: [PATCH 2/2] fix: Ensure final CRI status update occurs on context deadline exceeded --- pkg/controller/attach_detach.go | 14 ++++++++++---- pkg/controller/azvolume.go | 12 ++++++------ 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/pkg/controller/attach_detach.go b/pkg/controller/attach_detach.go index aa283073a2..5d5d1bedc6 100644 --- a/pkg/controller/attach_detach.go +++ b/pkg/controller/attach_detach.go @@ -255,7 +255,8 @@ func (r *ReconcileAttachDetach) triggerAttach(ctx context.Context, azVolumeAttac _, uerr := updateState(azv, azdiskv1beta2.AttachmentFailed, forceUpdate) return uerr } - _ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus) + //nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes. + _ = azureutils.UpdateCRIWithRetry(context.Background(), nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus) } handleSuccess = func(asyncComplete bool) { // Publish event to indicate attachment success @@ -278,7 +279,12 @@ func (r *ReconcileAttachDetach) triggerAttach(ctx context.Context, azVolumeAttac if asyncComplete && azVolumeAttachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole { _ = r.updateVolumeAttachmentWithResult(goCtx, azVolumeAttachment) } - _ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus) + updateCtx := goCtx + if asyncComplete { + //nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes. + updateCtx = context.Background() + } + _ = azureutils.UpdateCRIWithRetry(updateCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus) } attachAndUpdate() @@ -342,8 +348,8 @@ func (r *ReconcileAttachDetach) triggerDetach(ctx context.Context, azVolumeAttac } updateMode = azureutils.UpdateCRI } - // UpdateCRIWithRetry should be called on a context w/o timeout when called in a separate goroutine as it is not going to be retriggered and leave the CRI in unrecoverable transient state instead. - _ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode) + //nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes. + _ = azureutils.UpdateCRIWithRetry(context.Background(), nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode) }() <-waitCh } else { diff --git a/pkg/controller/azvolume.go b/pkg/controller/azvolume.go index f96c52096f..074142b977 100644 --- a/pkg/controller/azvolume.go +++ b/pkg/controller/azvolume.go @@ -197,8 +197,8 @@ func (r *ReconcileAzVolume) triggerCreate(ctx context.Context, azVolume *azdiskv } } - // UpdateCRIWithRetry should be called on a context w/o timeout when called in a separate goroutine as it is not going to be retriggered and leave the CRI in unrecoverable transient state instead. - _ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode) + //nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes. + _ = azureutils.UpdateCRIWithRetry(context.Background(), nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode) }() // wait for the workflow in goroutine to be created @@ -350,8 +350,8 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv } } - // UpdateCRIWithRetry should be called on a context w/o timeout when called in a separate goroutine as it is not going to be retriggered and leave the CRI in unrecoverable transient state instead. - _ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode) + //nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes. + _ = azureutils.UpdateCRIWithRetry(context.Background(), nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode) }() <-waitCh } else { @@ -422,8 +422,8 @@ func (r *ReconcileAzVolume) triggerUpdate(ctx context.Context, azVolume *azdiskv } } - // UpdateCRIWithRetry should be called on a context w/o timeout when called in a separate goroutine as it is not going to be retriggered and leave the CRI in unrecoverable transient state instead. - _ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus) + //nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes. + _ = azureutils.UpdateCRIWithRetry(context.Background(), nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus) }() <-waitCh