Skip to content

Commit

Permalink
chore: DeleteVolume should wait for replicas to be fully deleted inst…
Browse files Browse the repository at this point in the history
…ead of requeuing
  • Loading branch information
sunpa93 committed Apr 26, 2022
1 parent 0cd8a55 commit 4e1dd51
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 94 deletions.
1 change: 1 addition & 0 deletions pkg/azureconstants/azure_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ const (
CRIUpdateRetryDuration = time.Duration(1) * time.Second
CRIUpdateRetryFactor = 3.0
CRIUpdateRetryStep = 5
DefaultInformerResync = time.Duration(30) * time.Second
ZonedField = "zoned"
NormalUpdateMaxNetRetry = 0
ForcedUpdateMaxNetRetry = 5
Expand Down
118 changes: 93 additions & 25 deletions pkg/controller/azvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
diskv1beta1 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta1"
"sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
util "sigs.k8s.io/azuredisk-csi-driver/pkg/util"
"sigs.k8s.io/azuredisk-csi-driver/pkg/watcher"

consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -208,16 +209,6 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *diskv1b
}
}()

// Delete all AzVolumeAttachment objects bound to the deleted AzVolume
attachments, err := r.controllerSharedState.cleanUpAzVolumeAttachmentByVolume(ctx, azVolume.Name, azvolume, all, mode)
if err != nil {
return err
}

if len(attachments) > 0 {
return status.Errorf(codes.Aborted, "volume deletion requeued until attached azVolumeAttachments are entirely detached...")
}

// only try deleting underlying volume 1) if volume creation was successful and 2) volumeDeleteRequestAnnotation is present
// if the annotation is not present, only delete the CRI and not the underlying volume
if isCreated(azVolume) && volumeDeleteRequested {
Expand All @@ -239,31 +230,108 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *diskv1b
}

go func() {
cloudCtx, cloudCancel := context.WithTimeout(context.Background(), cloudTimeout)
defer cloudCancel()
deleteCtx, deleteCancel := context.WithTimeout(context.Background(), cloudTimeout)
defer deleteCancel()

reportError := func(obj interface{}, err error) error {
azv := obj.(*diskv1beta1.AzVolume)
_ = r.updateError(azv, err)
_, derr := r.updateState(azv, diskv1beta1.VolumeDeletionFailed, forceUpdate)
return derr
}

var updateFunc func(interface{}) error
var err error
updateMode := azureutils.UpdateCRIStatus
err := r.deleteVolume(cloudCtx, azVolume)

// Delete all AzVolumeAttachment objects bound to the deleted AzVolume
var attachments []diskv1beta1.AzVolumeAttachment
attachments, err = r.controllerSharedState.cleanUpAzVolumeAttachmentByVolume(deleteCtx, azVolume.Name, azvolume, all, mode)
if err != nil {
klog.Errorf("failed to delete volume (%s): %v", azVolume.Spec.VolumeName, err)
updateFunc = func(obj interface{}) error {
azv := obj.(*diskv1beta1.AzVolume)
azv = r.updateError(azv, err)
_, derr := r.updateState(azv, diskv1beta1.VolumeDeletionFailed, forceUpdate)
return derr
return reportError(obj, err)
}
} else {
klog.Infof("successfully deleted volume (%s)", azVolume.Spec.VolumeName)
updateFunc = func(obj interface{}) error {
azv := obj.(*diskv1beta1.AzVolume)
azv = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true})
_, derr := r.updateState(azv, diskv1beta1.VolumeDeleted, forceUpdate)
return derr
var wg sync.WaitGroup
errors := make([]error, len(attachments))
numErrors := uint32(0)

verifyDeleted := func(obj interface{}, objectDeleted bool) (bool, error) {
if obj == nil || objectDeleted {
return true, nil
}

// otherwise, the volume detachment has either failed with error or pending
azVolumeAttachmentInstance := obj.(*diskv1beta1.AzVolumeAttachment)
if azVolumeAttachmentInstance.Status.Error != nil {
return false, util.ErrorFromAzError(azVolumeAttachmentInstance.Status.Error)
}
return false, nil
}

// start waiting for replica AzVolumeAttachment CRIs to be deleted
for i, attachment := range attachments {
waiter, err := r.controllerSharedState.conditionWatcher.NewConditionWaiter(deleteCtx, watcher.AzVolumeAttachmentType, attachment.Name, verifyDeleted)
if err != nil {
updateFunc = func(obj interface{}) error {
return reportError(obj, err)
}
break
}

// wait async and report error to go channel
wg.Add(1)
go func(ctx context.Context, waiter *watcher.ConditionWaiter, i int) {
defer waiter.Close()
defer wg.Done()
_, err := waiter.Wait(ctx)
if err != nil {
errors[i] = err
atomic.AddUint32(&numErrors, 1)
}
}(deleteCtx, waiter, i)
}

wg.Wait()

// if errors have been found with the wait calls, format the error msg and report via CRI
if numErrors > 0 {
var errMsgs []string
for i, derr := range errors {
if derr != nil {
errMsgs = append(errMsgs, fmt.Sprintf("%s: %v", attachments[i].Name, derr))
}
}
err = status.Errorf(codes.Internal, strings.Join(errMsgs, ", "))
updateFunc = func(obj interface{}) error {
return reportError(obj, err)
}
}
updateMode = azureutils.UpdateAll
}

if err == nil {
cloudCtx, cloudCancel := context.WithTimeout(context.Background(), cloudTimeout)
defer cloudCancel()

err := r.deleteVolume(cloudCtx, azVolume)
if err != nil {
klog.Errorf("failed to delete volume (%s): %v", azVolume.Spec.VolumeName, err)
updateFunc = func(obj interface{}) error {
return reportError(obj, err)
}
} else {
klog.Infof("successfully deleted volume (%s)", azVolume.Spec.VolumeName)
updateFunc = func(obj interface{}) error {
azv := obj.(*diskv1beta1.AzVolume)
azv = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true})
_, derr := r.updateState(azv, diskv1beta1.VolumeDeleted, forceUpdate)
return derr
}
updateMode = azureutils.UpdateAll

}
}

// UpdateCRIWithRetry should be called on a context w/o timeout when called in a separate goroutine as it is not going to be retriggered and leave the CRI in unrecoverable transient state instead.
updateCtx := context.Background()
_ = azureutils.UpdateCRIWithRetry(updateCtx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode)
Expand Down
31 changes: 21 additions & 10 deletions pkg/controller/azvolume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"context"
"sync"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
fakev1 "k8s.io/client-go/kubernetes/fake"
diskv1beta1 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta1"
Expand All @@ -34,6 +36,7 @@ import (
"sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
"sigs.k8s.io/azuredisk-csi-driver/pkg/controller/mockclient"
"sigs.k8s.io/azuredisk-csi-driver/pkg/controller/mockvolumeprovisioner"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

Expand Down Expand Up @@ -216,7 +219,7 @@ func TestAzVolumeControllerReconcile(t *testing.T) {
},
},
{
description: "[Failure] Should delete volume attachments and requeue when AzVolume is marked for deletion.",
description: "[Success] Should delete replica volume attachments and delete AzVolume respectively",
request: testAzVolume0Request,
setupFunc: func(t *testing.T, mockCtl *gomock.Controller) *ReconcileAzVolume {
azVolume := testAzVolume0.DeepCopy()
Expand All @@ -237,7 +240,6 @@ func TestAzVolumeControllerReconcile(t *testing.T) {
mockCtl,
testNamespace,
azVolume,
&testPrimaryAzVolumeAttachment0,
&testReplicaAzVolumeAttachment)

mockClientsAndVolumeProvisioner(controller)
Expand All @@ -246,14 +248,23 @@ func TestAzVolumeControllerReconcile(t *testing.T) {
},
verifyFunc: func(t *testing.T, controller *ReconcileAzVolume, result reconcile.Result, err error) {
require.NoError(t, err)
require.Greater(t, result.RequeueAfter, time.Duration(0))

azVolume, err := controller.controllerSharedState.azClient.DiskV1beta1().AzVolumes(testNamespace).Get(context.TODO(), testPersistentVolume0Name, metav1.GetOptions{})
req, err := azureutils.CreateLabelRequirements(consts.VolumeNameLabel, selection.Equals, testPersistentVolume0Name)
require.NoError(t, err)
labelSelector := labels.NewSelector().Add(*req)
checkAzVolumeAttachmentDeletion := func() (bool, error) {
var attachments diskv1beta1.AzVolumeAttachmentList
err := controller.controllerSharedState.cachedClient.List(context.Background(), &attachments, &client.ListOptions{LabelSelector: labelSelector})
return len(attachments.Items) == 0, err
}
err = wait.PollImmediate(verifyCRIInterval, verifyCRITimeout, checkAzVolumeAttachmentDeletion)
require.NoError(t, err)
checkAzVolumeDeletion := func() (bool, error) {
var azVolume diskv1beta1.AzVolume
err := controller.controllerSharedState.cachedClient.Get(context.Background(), types.NamespacedName{Namespace: controller.controllerSharedState.objectNamespace, Name: testPersistentVolume0Name}, &azVolume)
return azVolume.Status.State == diskv1beta1.VolumeDeleted, err
}
err = wait.PollImmediate(verifyCRIInterval, verifyCRITimeout, checkAzVolumeDeletion)
require.NoError(t, err)
require.Equal(t, diskv1beta1.VolumeCreated, azVolume.Status.State)

azVolumeAttachments, _ := controller.controllerSharedState.azClient.DiskV1beta1().AzVolumeAttachments(testNamespace).List(context.TODO(), metav1.ListOptions{})
require.Len(t, azVolumeAttachments.Items, 0)
},
},
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ import (
"k8s.io/klog/v2"
diskv1beta1 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta1"
azClientSet "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/client/clientset/versioned"
azurediskInformers "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/client/informers/externalversions"
consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
"sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
"sigs.k8s.io/azuredisk-csi-driver/pkg/watcher"

utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -266,10 +268,11 @@ type SharedState struct {
cachedClient client.Client
azClient azClientSet.Interface
kubeClient kubernetes.Interface
conditionWatcher *watcher.ConditionWatcher
}

func NewSharedState(driverName, objectNamespace, topologyKey string, eventRecorder record.EventRecorder, cachedClient client.Client, azClient azClientSet.Interface, kubeClient kubernetes.Interface) *SharedState {
newSharedState := &SharedState{driverName: driverName, objectNamespace: objectNamespace, topologyKey: topologyKey, eventRecorder: eventRecorder, cachedClient: cachedClient, azClient: azClient, kubeClient: kubeClient}
newSharedState := &SharedState{driverName: driverName, objectNamespace: objectNamespace, topologyKey: topologyKey, eventRecorder: eventRecorder, cachedClient: cachedClient, azClient: azClient, kubeClient: kubeClient, conditionWatcher: watcher.NewConditionWatcher(context.Background(), azClient, azurediskInformers.NewSharedInformerFactory(azClient, consts.DefaultInformerResync), objectNamespace)}
newSharedState.createReplicaRequestsQueue()
return newSharedState
}
Expand Down
Loading

0 comments on commit 4e1dd51

Please sign in to comment.