Skip to content

Commit

Permalink
Merge pull request #1399 from edreed/edreed-fix-update-func-chaining
Browse files Browse the repository at this point in the history
[V2] fix: Fix CRI update on error recovery
  • Loading branch information
edreed authored Jun 26, 2022
2 parents 093653d + f858b4d commit a91aecf
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 61 deletions.
12 changes: 8 additions & 4 deletions pkg/azureutils/azure_disk_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,9 @@ func GetAzVolumeAttachmentState(volumeAttachmentStatus storagev1.VolumeAttachmen
}
}

func UpdateCRIWithRetry(ctx context.Context, informerFactory azdiskinformers.SharedInformerFactory, cachedClient client.Client, azDiskClient azdisk.Interface, obj client.Object, updateFunc func(interface{}) error, maxNetRetry int, updateMode CRIUpdateMode) error {
type UpdateCRIFunc func(client.Object) error

func UpdateCRIWithRetry(ctx context.Context, informerFactory azdiskinformers.SharedInformerFactory, cachedClient client.Client, azDiskClient azdisk.Interface, obj client.Object, updateFunc UpdateCRIFunc, maxNetRetry int, updateMode CRIUpdateMode) error {
var err error
objName := obj.GetName()
ctx, w := workflow.New(ctx)
Expand Down Expand Up @@ -1028,11 +1030,13 @@ func UpdateCRIWithRetry(ctx context.Context, informerFactory azdiskinformers.Sha
return err
}

func AppendToUpdateFunc(updateFunc, newFunc func(interface{}) error) func(interface{}) error {
func AppendToUpdateCRIFunc(updateFunc, newFunc UpdateCRIFunc) UpdateCRIFunc {
if updateFunc != nil {
innerFunc := updateFunc
return func(obj interface{}) error {
obj = innerFunc(obj)
return func(obj client.Object) error {
if err := innerFunc(obj); err != nil {
return err
}
return newFunc(obj)
}
}
Expand Down
39 changes: 23 additions & 16 deletions pkg/controller/attach_detach.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"sigs.k8s.io/azuredisk-csi-driver/pkg/workflow"

volerr "k8s.io/cloud-provider/volume/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -158,7 +159,7 @@ func (r *ReconcileAttachDetach) triggerAttach(ctx context.Context, azVolumeAttac
}

// update status block
updateFunc := func(obj interface{}) error {
updateFunc := func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
// Update state to attaching, Initialize finalizer and add label to the object
_, derr := updateState(azv, azdiskv1beta2.Attaching, normalUpdate)
Expand Down Expand Up @@ -248,13 +249,14 @@ func (r *ReconcileAttachDetach) triggerAttach(ctx context.Context, azVolumeAttac
}
}

updateFunc := func(obj interface{}) error {
updateFunc := func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
azv = updateError(azv, attachErr)
_, uerr := updateState(azv, azdiskv1beta2.AttachmentFailed, forceUpdate)
return uerr
}
_ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus)
//nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes.
_ = azureutils.UpdateCRIWithRetry(context.Background(), nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus)
}
handleSuccess = func(asyncComplete bool) {
// Publish event to indicate attachment success
Expand All @@ -264,7 +266,7 @@ func (r *ReconcileAttachDetach) triggerAttach(ctx context.Context, azVolumeAttac
}
}

updateFunc := func(obj interface{}) error {
updateFunc := func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
azv = updateStatusDetail(azv, publishCtx)
var uerr error
Expand All @@ -277,7 +279,12 @@ func (r *ReconcileAttachDetach) triggerAttach(ctx context.Context, azVolumeAttac
if asyncComplete && azVolumeAttachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole {
_ = r.updateVolumeAttachmentWithResult(goCtx, azVolumeAttachment)
}
_ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus)
updateCtx := goCtx
if asyncComplete {
//nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes.
updateCtx = context.Background()
}
_ = azureutils.UpdateCRIWithRetry(updateCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus)
}

attachAndUpdate()
Expand All @@ -300,7 +307,7 @@ func (r *ReconcileAttachDetach) triggerDetach(ctx context.Context, azVolumeAttac
return getOperationRequeueError("detach", azVolumeAttachment)
}

updateFunc := func(obj interface{}) error {
updateFunc := func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
// Update state to detaching
_, derr := updateState(azv, azdiskv1beta2.Detaching, normalUpdate)
Expand All @@ -323,30 +330,30 @@ func (r *ReconcileAttachDetach) triggerDetach(ctx context.Context, azVolumeAttac
cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout)
defer cloudCancel()

var updateFunc func(obj interface{}) error
var updateFunc func(obj client.Object) error
updateMode := azureutils.UpdateCRIStatus
detachErr = r.detachVolume(cloudCtx, azVolumeAttachment.Spec.VolumeID, azVolumeAttachment.Spec.NodeName)
if detachErr != nil {
updateFunc = func(obj interface{}) error {
updateFunc = func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
azv = updateError(azv, detachErr)
_, derr := updateState(azv, azdiskv1beta2.DetachmentFailed, forceUpdate)
return derr
}
} else {
updateFunc = func(obj interface{}) error {
updateFunc = func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
_ = r.deleteFinalizer(azv)
return nil
}
updateMode = azureutils.UpdateCRI
}
// UpdateCRIWithRetry should be called on a context w/o timeout when called in a separate goroutine as it is not going to be retriggered and leave the CRI in unrecoverable transient state instead.
_ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode)
//nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes.
_ = azureutils.UpdateCRIWithRetry(context.Background(), nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode)
}()
<-waitCh
} else {
updateFunc := func(obj interface{}) error {
updateFunc := func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
// delete finalizer
_ = r.deleteFinalizer(azv)
Expand All @@ -369,7 +376,7 @@ func (r *ReconcileAttachDetach) promote(ctx context.Context, azVolumeAttachment
return err
}
// initialize metadata and update status block
updateFunc := func(obj interface{}) error {
updateFunc := func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
_ = updateRole(azv, azdiskv1beta2.PrimaryRole)
return nil
Expand All @@ -387,7 +394,7 @@ func (r *ReconcileAttachDetach) demote(ctx context.Context, azVolumeAttachment *

w.Logger().Info("Demoting AzVolumeAttachment")
// initialize metadata and update status block
updateFunc := func(obj interface{}) error {
updateFunc := func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
delete(azv.Status.Annotations, consts.VolumeAttachmentKey)
_ = updateRole(azv, azdiskv1beta2.ReplicaRole)
Expand All @@ -407,7 +414,7 @@ func (r *ReconcileAttachDetach) updateVolumeAttachmentWithResult(ctx context.Con
return err
}

vaUpdateFunc := func(obj interface{}) error {
vaUpdateFunc := func(obj client.Object) error {
va := obj.(*storagev1.VolumeAttachment)
if azVolumeAttachment.Status.Detail != nil {
for key, value := range azVolumeAttachment.Status.Detail.PublishContext {
Expand Down Expand Up @@ -661,7 +668,7 @@ func (r *ReconcileAttachDetach) recoverAzVolumeAttachment(ctx context.Context, r
go func(azv azdiskv1beta2.AzVolumeAttachment, azvMap *sync.Map) {
defer wg.Done()
var targetState azdiskv1beta2.AzVolumeAttachmentAttachmentState
updateFunc := func(obj interface{}) error {
updateFunc := func(obj client.Object) error {
var err error
azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
// add a recover annotation to the CRI so that reconciliation can be triggered for the CRI even if CRI's current state == target state
Expand Down
47 changes: 24 additions & 23 deletions pkg/controller/azvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"sigs.k8s.io/azuredisk-csi-driver/pkg/workflow"

consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -145,7 +146,7 @@ func (r *ReconcileAzVolume) triggerCreate(ctx context.Context, azVolume *azdiskv
}

// update state
updateFunc := func(obj interface{}) error {
updateFunc := func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
_, err := r.updateState(azv, azdiskv1beta2.VolumeCreating, normalUpdate)
return err
Expand All @@ -169,12 +170,12 @@ func (r *ReconcileAzVolume) triggerCreate(ctx context.Context, azVolume *azdiskv
cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout)
defer cloudCancel()

var updateFunc func(interface{}) error
var updateFunc azureutils.UpdateCRIFunc
var response *azdiskv1beta2.AzVolumeStatusDetail
response, createErr = r.createVolume(cloudCtx, azVolume)
updateMode := azureutils.UpdateCRIStatus
if createErr != nil {
updateFunc = func(obj interface{}) error {
updateFunc = func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
azv = r.updateError(azv, createErr)
azv = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true})
Expand All @@ -185,7 +186,7 @@ func (r *ReconcileAzVolume) triggerCreate(ctx context.Context, azVolume *azdiskv
} else {
// create operation queue for the volume
r.controllerSharedState.createOperationQueue(azVolume.Name)
updateFunc = func(obj interface{}) error {
updateFunc = func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
if response == nil {
return status.Errorf(codes.Internal, "non-nil AzVolumeStatusDetail expected but nil given")
Expand All @@ -196,8 +197,8 @@ func (r *ReconcileAzVolume) triggerCreate(ctx context.Context, azVolume *azdiskv
}
}

// UpdateCRIWithRetry should be called on a context w/o timeout when called in a separate goroutine as it is not going to be retriggered and leave the CRI in unrecoverable transient state instead.
_ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode)
//nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes.
_ = azureutils.UpdateCRIWithRetry(context.Background(), nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode)
}()

// wait for the workflow in goroutine to be created
Expand Down Expand Up @@ -235,7 +236,7 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv
if _, ok := r.stateLock.LoadOrStore(azVolume.Name, nil); ok {
return getOperationRequeueError("delete", azVolume)
}
updateFunc := func(obj interface{}) error {
updateFunc := func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
_, derr := r.updateState(azv, azdiskv1beta2.VolumeDeleting, normalUpdate)
return derr
Expand Down Expand Up @@ -269,15 +270,15 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv
return derr
}

var updateFunc func(interface{}) error
var updateFunc azureutils.UpdateCRIFunc
var err error
updateMode := azureutils.UpdateCRIStatus

// Delete all AzVolumeAttachment objects bound to the deleted AzVolume
var attachments []azdiskv1beta2.AzVolumeAttachment
attachments, err = r.controllerSharedState.cleanUpAzVolumeAttachmentByVolume(deleteCtx, azVolume.Name, azvolume, all, mode)
if err != nil {
updateFunc = func(obj interface{}) error {
updateFunc = func(obj client.Object) error {
return reportError(obj, err)
}
} else {
Expand All @@ -289,7 +290,7 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv
for i, attachment := range attachments {
waiter, err := r.controllerSharedState.conditionWatcher.NewConditionWaiter(deleteCtx, watcher.AzVolumeAttachmentType, attachment.Name, verifyObjectDeleted)
if err != nil {
updateFunc = func(obj interface{}) error {
updateFunc = func(obj client.Object) error {
return reportError(obj, err)
}
break
Expand Down Expand Up @@ -319,7 +320,7 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv
}
}
err = status.Errorf(codes.Internal, strings.Join(errMsgs, ", "))
updateFunc = func(obj interface{}) error {
updateFunc = func(obj client.Object) error {
return reportError(obj, err)
}
}
Expand All @@ -333,28 +334,28 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv
deleteErr = r.deleteVolume(cloudCtx, azVolume)
}
if deleteErr != nil {
updateFunc = func(obj interface{}) error {
updateFunc = func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
azv = r.updateError(azv, deleteErr)
_, derr := r.updateState(azv, azdiskv1beta2.VolumeDeletionFailed, forceUpdate)
return derr
}
} else {
updateMode = azureutils.UpdateAll
updateFunc = func(obj interface{}) error {
updateFunc = func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
_ = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true})
return nil
}
}
}

// UpdateCRIWithRetry should be called on a context w/o timeout when called in a separate goroutine as it is not going to be retriggered and leave the CRI in unrecoverable transient state instead.
_ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode)
//nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes.
_ = azureutils.UpdateCRIWithRetry(context.Background(), nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode)
}()
<-waitCh
} else {
updateFunc := func(obj interface{}) error {
updateFunc := func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
_ = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true})
return nil
Expand All @@ -377,7 +378,7 @@ func (r *ReconcileAzVolume) triggerUpdate(ctx context.Context, azVolume *azdiskv
err = getOperationRequeueError("update", azVolume)
return err
}
updateFunc := func(obj interface{}) error {
updateFunc := func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
_, derr := r.updateState(azv, azdiskv1beta2.VolumeUpdating, normalUpdate)
return derr
Expand All @@ -399,18 +400,18 @@ func (r *ReconcileAzVolume) triggerUpdate(ctx context.Context, azVolume *azdiskv
cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout)
defer cloudCancel()

var updateFunc func(interface{}) error
var updateFunc azureutils.UpdateCRIFunc
var response *azdiskv1beta2.AzVolumeStatusDetail
response, updateErr = r.expandVolume(cloudCtx, azVolume)
if updateErr != nil {
updateFunc = func(obj interface{}) error {
updateFunc = func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
azv = r.updateError(azv, updateErr)
_, derr := r.updateState(azv, azdiskv1beta2.VolumeUpdateFailed, forceUpdate)
return derr
}
} else {
updateFunc = func(obj interface{}) error {
updateFunc = func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
if response == nil {
return status.Errorf(codes.Internal, "non-nil AzVolumeStatusDetail expected but nil given")
Expand All @@ -421,8 +422,8 @@ func (r *ReconcileAzVolume) triggerUpdate(ctx context.Context, azVolume *azdiskv
}
}

// UpdateCRIWithRetry should be called on a context w/o timeout when called in a separate goroutine as it is not going to be retriggered and leave the CRI in unrecoverable transient state instead.
_ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus)
//nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes.
_ = azureutils.UpdateCRIWithRetry(context.Background(), nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus)
}()
<-waitCh

Expand Down Expand Up @@ -555,7 +556,7 @@ func (r *ReconcileAzVolume) recoverAzVolume(ctx context.Context, recoveredAzVolu
go func(azv azdiskv1beta2.AzVolume, azvMap *sync.Map) {
defer wg.Done()
var targetState azdiskv1beta2.AzVolumeState
updateFunc := func(obj interface{}) error {
updateFunc := func(obj client.Object) error {
var err error
azv := obj.(*azdiskv1beta2.AzVolume)
// add a recover annotation to the CRI so that reconciliation can be triggered for the CRI even if CRI's current state == target state
Expand Down
11 changes: 6 additions & 5 deletions pkg/controller/pv.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
"sigs.k8s.io/azuredisk-csi-driver/pkg/workflow"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -114,7 +115,7 @@ func (r *ReconcilePV) Reconcile(ctx context.Context, request reconcile.Request)
}
// AzVolume does exist and needs to be deleted
// add annotation to mark AzVolumeAttachment cleanup
updateFunc := func(obj interface{}) error {
updateFunc := func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
azv.Status.Annotations = azureutils.AddToMap(azv.Status.Annotations, consts.PreProvisionedVolumeCleanupAnnotation, "true")
return nil
Expand Down Expand Up @@ -156,10 +157,10 @@ func (r *ReconcilePV) Reconcile(ctx context.Context, request reconcile.Request)
return reconcileReturnOnSuccess(pv.Name, r.controllerRetryInfo)
}

var azVolumeUpdateFunc func(interface{}) error
var azVolumeUpdateFunc azureutils.UpdateCRIFunc

if azVolume.Spec.PersistentVolume != pv.Name {
azVolumeUpdateFunc = func(obj interface{}) error {
azVolumeUpdateFunc = func(obj client.Object) error {
azVolume := obj.(*azdiskv1beta2.AzVolume)
azVolume.Spec.PersistentVolume = pv.Name
return nil
Expand All @@ -171,15 +172,15 @@ func (r *ReconcilePV) Reconcile(ctx context.Context, request reconcile.Request)
pvcNamespace, pvcNamespaceLabelExists := azureutils.GetFromMap(azVolume.Labels, consts.PvcNamespaceLabel)
if pv.Spec.ClaimRef != nil {
if !pvcLabelExists || pv.Spec.ClaimRef.Name != pvcName || !pvcNamespaceLabelExists || pv.Spec.ClaimRef.Namespace != pvcNamespace {
azureutils.AppendToUpdateFunc(azVolumeUpdateFunc, func(obj interface{}) error {
azVolumeUpdateFunc = azureutils.AppendToUpdateCRIFunc(azVolumeUpdateFunc, func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
azv.Labels = azureutils.AddToMap(azv.Labels, consts.PvcNameLabel, pv.Spec.ClaimRef.Name, consts.PvcNamespaceLabel, pv.Spec.ClaimRef.Namespace)
return nil
})
}
} else {
if pvcLabelExists || pvcNamespaceLabelExists {
azureutils.AppendToUpdateFunc(azVolumeUpdateFunc, func(obj interface{}) error {
azVolumeUpdateFunc = azureutils.AppendToUpdateCRIFunc(azVolumeUpdateFunc, func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
delete(azv.Labels, consts.PvcNameLabel)
delete(azv.Labels, consts.PvcNamespaceLabel)
Expand Down
Loading

0 comments on commit a91aecf

Please sign in to comment.