Skip to content

Commit

Permalink
chore: DeleteVolume should wait for replicas to be fully deleted inst…
Browse files Browse the repository at this point in the history
…ead of requeuing
  • Loading branch information
sunpa93 committed Apr 28, 2022
1 parent 04aba1e commit 4396fa1
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 123 deletions.
1 change: 1 addition & 0 deletions pkg/azureconstants/azure_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ const (
CRIUpdateRetryDuration = time.Duration(1) * time.Second
CRIUpdateRetryFactor = 3.0
CRIUpdateRetryStep = 5
DefaultInformerResync = time.Duration(30) * time.Second
ZonedField = "zoned"
NormalUpdateMaxNetRetry = 0
ForcedUpdateMaxNetRetry = 5
Expand Down
107 changes: 81 additions & 26 deletions pkg/controller/azvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
diskv1beta1 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta1"
"sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
util "sigs.k8s.io/azuredisk-csi-driver/pkg/util"
"sigs.k8s.io/azuredisk-csi-driver/pkg/watcher"
"sigs.k8s.io/azuredisk-csi-driver/pkg/workflow"

consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
Expand Down Expand Up @@ -226,18 +227,6 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *diskv1b
}
}()

// Delete all AzVolumeAttachment objects bound to the deleted AzVolume
var attachments []diskv1beta1.AzVolumeAttachment
attachments, err = r.controllerSharedState.cleanUpAzVolumeAttachmentByVolume(ctx, azVolume.Name, azvolume, all, mode)
if err != nil {
return err
}

if len(attachments) > 0 {
err = status.Errorf(codes.Aborted, "volume deletion requeued until attached azVolumeAttachments are entirely detached...")
return err
}

// only try deleting underlying volume 1) if volume creation was successful and 2) volumeDeleteRequestAnnotation is present
// if the annotation is not present, only delete the CRI and not the underlying volume
if isCreated(azVolume) && volumeDeleteRequested {
Expand Down Expand Up @@ -266,29 +255,95 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *diskv1b
waitCh <- goSignal{}

goCtx := goWorkflow.SaveToContext(context.Background())
cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout)
defer cloudCancel()

deleteCtx, deleteCancel := context.WithTimeout(goCtx, cloudTimeout)
defer deleteCancel()

reportError := func(obj interface{}, err error) error {
azv := obj.(*diskv1beta1.AzVolume)
_ = r.updateError(azv, err)
_, derr := r.updateState(azv, diskv1beta1.VolumeDeletionFailed, forceUpdate)
return derr
}

var updateFunc func(interface{}) error
var err error
updateMode := azureutils.UpdateCRIStatus
deleteErr = r.deleteVolume(cloudCtx, azVolume)
if deleteErr != nil {

// Delete all AzVolumeAttachment objects bound to the deleted AzVolume
var attachments []diskv1beta1.AzVolumeAttachment
attachments, err = r.controllerSharedState.cleanUpAzVolumeAttachmentByVolume(deleteCtx, azVolume.Name, azvolume, all, mode)
if err != nil {
updateFunc = func(obj interface{}) error {
azv := obj.(*diskv1beta1.AzVolume)
azv = r.updateError(azv, deleteErr)
_, derr := r.updateState(azv, diskv1beta1.VolumeDeletionFailed, forceUpdate)
return derr
return reportError(obj, err)
}
} else {
updateFunc = func(obj interface{}) error {
azv := obj.(*diskv1beta1.AzVolume)
azv = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true})
_, derr := r.updateState(azv, diskv1beta1.VolumeDeleted, forceUpdate)
return derr
var wg sync.WaitGroup
errors := make([]error, len(attachments))
numErrors := uint32(0)

// start waiting for replica AzVolumeAttachment CRIs to be deleted
for i, attachment := range attachments {
waiter, err := r.controllerSharedState.conditionWatcher.NewConditionWaiter(deleteCtx, watcher.AzVolumeAttachmentType, attachment.Name, verifyObjectDeleted)
if err != nil {
updateFunc = func(obj interface{}) error {
return reportError(obj, err)
}
break
}

// wait async and report error to go channel
wg.Add(1)
go func(ctx context.Context, waiter *watcher.ConditionWaiter, i int) {
defer waiter.Close()
defer wg.Done()
_, err := waiter.Wait(ctx)
if err != nil {
errors[i] = err
atomic.AddUint32(&numErrors, 1)
}
}(deleteCtx, waiter, i)
}
updateMode = azureutils.UpdateAll

wg.Wait()

// if errors have been found with the wait calls, format the error msg and report via CRI
if numErrors > 0 {
var errMsgs []string
for i, derr := range errors {
if derr != nil {
errMsgs = append(errMsgs, fmt.Sprintf("%s: %v", attachments[i].Name, derr))
}
}
err = status.Errorf(codes.Internal, strings.Join(errMsgs, ", "))
updateFunc = func(obj interface{}) error {
return reportError(obj, err)
}
}
}

if err == nil {
cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout)
defer cloudCancel()

deleteErr = r.deleteVolume(cloudCtx, azVolume)
if deleteErr != nil {
updateFunc = func(obj interface{}) error {
azv := obj.(*diskv1beta1.AzVolume)
azv = r.updateError(azv, deleteErr)
_, derr := r.updateState(azv, diskv1beta1.VolumeDeletionFailed, forceUpdate)
return derr
}
} else {
updateFunc = func(obj interface{}) error {
azv := obj.(*diskv1beta1.AzVolume)
azv = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true})
_, derr := r.updateState(azv, diskv1beta1.VolumeDeleted, forceUpdate)
return derr
}
}
}

// UpdateCRIWithRetry should be called on a context w/o timeout when called in a separate goroutine as it is not going to be retriggered and leave the CRI in unrecoverable transient state instead.
_ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode)
}()
Expand Down
31 changes: 21 additions & 10 deletions pkg/controller/azvolume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"context"
"sync"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
fakev1 "k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/klogr"
Expand All @@ -35,6 +37,7 @@ import (
"sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
"sigs.k8s.io/azuredisk-csi-driver/pkg/controller/mockclient"
"sigs.k8s.io/azuredisk-csi-driver/pkg/controller/mockvolumeprovisioner"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

Expand Down Expand Up @@ -218,7 +221,7 @@ func TestAzVolumeControllerReconcile(t *testing.T) {
},
},
{
description: "[Failure] Should delete volume attachments and requeue when AzVolume is marked for deletion.",
description: "[Success] Should delete replica volume attachments and delete AzVolume respectively",
request: testAzVolume0Request,
setupFunc: func(t *testing.T, mockCtl *gomock.Controller) *ReconcileAzVolume {
azVolume := testAzVolume0.DeepCopy()
Expand All @@ -239,7 +242,6 @@ func TestAzVolumeControllerReconcile(t *testing.T) {
mockCtl,
testNamespace,
azVolume,
&testPrimaryAzVolumeAttachment0,
&testReplicaAzVolumeAttachment)

mockClientsAndVolumeProvisioner(controller)
Expand All @@ -248,14 +250,23 @@ func TestAzVolumeControllerReconcile(t *testing.T) {
},
verifyFunc: func(t *testing.T, controller *ReconcileAzVolume, result reconcile.Result, err error) {
require.NoError(t, err)
require.Greater(t, result.RequeueAfter, time.Duration(0))

azVolume, err := controller.controllerSharedState.azClient.DiskV1beta1().AzVolumes(testNamespace).Get(context.TODO(), testPersistentVolume0Name, metav1.GetOptions{})
req, err := azureutils.CreateLabelRequirements(consts.VolumeNameLabel, selection.Equals, testPersistentVolume0Name)
require.NoError(t, err)
labelSelector := labels.NewSelector().Add(*req)
checkAzVolumeAttachmentDeletion := func() (bool, error) {
var attachments diskv1beta1.AzVolumeAttachmentList
err := controller.controllerSharedState.cachedClient.List(context.Background(), &attachments, &client.ListOptions{LabelSelector: labelSelector})
return len(attachments.Items) == 0, err
}
err = wait.PollImmediate(verifyCRIInterval, verifyCRITimeout, checkAzVolumeAttachmentDeletion)
require.NoError(t, err)
checkAzVolumeDeletion := func() (bool, error) {
var azVolume diskv1beta1.AzVolume
err := controller.controllerSharedState.cachedClient.Get(context.Background(), types.NamespacedName{Namespace: controller.controllerSharedState.objectNamespace, Name: testPersistentVolume0Name}, &azVolume)
return azVolume.Status.State == diskv1beta1.VolumeDeleted, err
}
err = wait.PollImmediate(verifyCRIInterval, verifyCRITimeout, checkAzVolumeDeletion)
require.NoError(t, err)
require.Equal(t, diskv1beta1.VolumeCreated, azVolume.Status.State)

azVolumeAttachments, _ := controller.controllerSharedState.azClient.DiskV1beta1().AzVolumeAttachments(testNamespace).List(context.TODO(), metav1.ListOptions{})
require.Len(t, azVolumeAttachments.Items, 0)
},
},
}
Expand Down
19 changes: 18 additions & 1 deletion pkg/controller/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ import (
"sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta1"
diskv1beta1 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta1"
azClientSet "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/client/clientset/versioned"
azurediskInformers "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/client/informers/externalversions"
consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
"sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
"sigs.k8s.io/azuredisk-csi-driver/pkg/util"
"sigs.k8s.io/azuredisk-csi-driver/pkg/watcher"
"sigs.k8s.io/azuredisk-csi-driver/pkg/workflow"

utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -270,10 +273,11 @@ type SharedState struct {
cachedClient client.Client
azClient azClientSet.Interface
kubeClient kubernetes.Interface
conditionWatcher *watcher.ConditionWatcher
}

func NewSharedState(driverName, objectNamespace, topologyKey string, eventRecorder record.EventRecorder, cachedClient client.Client, azClient azClientSet.Interface, kubeClient kubernetes.Interface) *SharedState {
newSharedState := &SharedState{driverName: driverName, objectNamespace: objectNamespace, topologyKey: topologyKey, eventRecorder: eventRecorder, cachedClient: cachedClient, azClient: azClient, kubeClient: kubeClient}
newSharedState := &SharedState{driverName: driverName, objectNamespace: objectNamespace, topologyKey: topologyKey, eventRecorder: eventRecorder, cachedClient: cachedClient, azClient: azClient, kubeClient: kubeClient, conditionWatcher: watcher.New(context.Background(), azClient, azurediskInformers.NewSharedInformerFactory(azClient, consts.DefaultInformerResync), objectNamespace)}
newSharedState.createReplicaRequestsQueue()
return newSharedState
}
Expand Down Expand Up @@ -2177,3 +2181,16 @@ func (c *SharedState) getNodesForReplica(ctx context.Context, volumeName string,

return filtered, nil
}

func verifyObjectDeleted(obj interface{}, objectDeleted bool) (bool, error) {
if obj == nil || objectDeleted {
return true, nil
}

// otherwise, the volume detachment has either failed with error or pending
azVolumeAttachmentInstance := obj.(*diskv1beta1.AzVolumeAttachment)
if azVolumeAttachmentInstance.Status.Error != nil {
return false, util.ErrorFromAzError(azVolumeAttachmentInstance.Status.Error)
}
return false, nil
}
19 changes: 4 additions & 15 deletions pkg/controller/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
diskv1beta1 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta1"
consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
"sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
"sigs.k8s.io/azuredisk-csi-driver/pkg/watcher"
"sigs.k8s.io/azuredisk-csi-driver/pkg/workflow"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand All @@ -37,10 +37,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
)

const (
deletionPollingInterval = time.Duration(10) * time.Second
)

type ReconcileReplica struct {
logger logr.Logger
controllerSharedState *SharedState
Expand Down Expand Up @@ -101,16 +97,9 @@ func (r *ReconcileReplica) Reconcile(ctx context.Context, request reconcile.Requ
goCtx := context.Background()

// wait for replica AzVolumeAttachment deletion
conditionFunc := func() (bool, error) {
var tmp diskv1beta1.AzVolumeAttachment
err := r.controllerSharedState.cachedClient.Get(goCtx, request.NamespacedName, &tmp)
if errors.IsNotFound(err) {
return true, nil
}

return false, err
}
_ = wait.PollImmediateInfinite(deletionPollingInterval, conditionFunc)
waiter, _ := r.controllerSharedState.conditionWatcher.NewConditionWaiter(goCtx, watcher.AzVolumeAttachmentType, azVolumeAttachment.Name, verifyObjectDeleted)
defer waiter.Close()
_, _ = waiter.Wait(goCtx)

// add replica management operation to the queue
r.triggerManageReplica(goCtx, azVolumeAttachment.Spec.VolumeName)
Expand Down
Loading

0 comments on commit 4396fa1

Please sign in to comment.