Skip to content

Commit

Permalink
fix: prevent race between controller reconciler and controller recove…
Browse files Browse the repository at this point in the history
…ry operation
  • Loading branch information
sunpa93 committed Mar 17, 2022
1 parent 94da92b commit b13ee46
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/azuredisk/azuredisk_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ func (d *DriverV2) StartControllersAndDieOnExit(ctx context.Context) {
if err := podReconciler.Recover(ctx); err != nil {
klog.Warningf("Failed to recover replica AzVolumeAttachments: %v.", err)
}
sharedState.MarkRecoveryComplete()
}()

klog.V(2).Info("Starting controller manager")
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/attach_detach.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ var allowedTargetAttachmentStates = map[string][]string{
}

func (r *ReconcileAttachDetach) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
if !r.controllerSharedState.isRecoveryComplete() {
return reconcile.Result{Requeue: true}, nil
}

azVolumeAttachment, err := azureutils.GetAzVolumeAttachment(ctx, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, request.Name, request.Namespace, true)
// if object is not found, it means the object has been deleted. Log the deletion and do not requeue
if errors.IsNotFound(err) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/azdrivernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type ReconcileAzDriverNode struct {
var _ reconcile.Reconciler = &ReconcileAzDriverNode{}

func (r *ReconcileAzDriverNode) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
if !r.controllerSharedState.isRecoveryComplete() {
return reconcile.Result{Requeue: true}, nil
}

klog.V(2).Info("Checking to see if node (%v) exists.", request.NamespacedName)
n := &corev1.Node{}
err := r.controllerSharedState.cachedClient.Get(ctx, request.NamespacedName, n)
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/azvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ var allowedTargetVolumeStates = map[string][]string{
}

func (r *ReconcileAzVolume) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
if !r.controllerSharedState.isRecoveryComplete() {
return reconcile.Result{Requeue: true}, nil
}

azVolume, err := azureutils.GetAzVolume(ctx, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, request.Name, request.Namespace, true)
if err != nil {
// if AzVolume has been deleted, delete the operation queue for the volume and return success
Expand Down
9 changes: 9 additions & 0 deletions pkg/controller/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func newLockableEntry(entry interface{}) *lockableEntry {
}

type SharedState struct {
recoveryComplete uint32
driverName string
objectNamespace string
topologyKey string
Expand All @@ -262,6 +263,14 @@ func NewSharedState(driverName, objectNamespace, topologyKey string, eventRecord
return newSharedState
}

func (c *SharedState) isRecoveryComplete() bool {
return c.recoveryComplete == 1
}

func (c *SharedState) MarkRecoveryComplete() {
atomic.StoreUint32(&c.recoveryComplete, 1)
}

func (c *SharedState) createOperationQueue(volumeName string) {
_, _ = c.volumeOperationQueues.LoadOrStore(volumeName, newLockableEntry(newOperationQueue()))
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func createPod(podNamespace, podName string, pvcs []string) v1.Pod {

func initState(client client.Client, azClient azClientSet.Interface, kubeClient kubernetes.Interface, objs ...runtime.Object) (c *SharedState) {
c = NewSharedState(consts.DefaultDriverName, consts.DefaultAzureDiskCrdNamespace, consts.WellKnownTopologyKey, &record.FakeRecorder{}, client, azClient, kubeClient)
c.MarkRecoveryComplete()

for _, obj := range objs {
switch target := obj.(type) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/node_availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type ReconcileNodeAvailability struct {
var _ reconcile.Reconciler = &ReconcileNodeAvailability{}

func (r *ReconcileNodeAvailability) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
if !r.controllerSharedState.isRecoveryComplete() {
return reconcile.Result{Requeue: true}, nil
}

n := &corev1.Node{}
err := r.controllerSharedState.cachedClient.Get(ctx, request.NamespacedName, n)
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type ReconcilePod struct {
var _ reconcile.Reconciler = &ReconcilePod{}

func (r *ReconcilePod) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
if !r.controllerSharedState.isRecoveryComplete() {
return reconcile.Result{Requeue: true}, nil
}

var pod corev1.Pod
klog.V(5).Infof("Reconcile pod %s.", request.Name)
podKey := getQualifiedName(request.Namespace, request.Name)
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/pv.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type ReconcilePV struct {
var _ reconcile.Reconciler = &ReconcilePV{}

func (r *ReconcilePV) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
if !r.controllerSharedState.isRecoveryComplete() {
return reconcile.Result{Requeue: true}, nil
}

var pv corev1.PersistentVolume
var azVolume diskv1alpha2.AzVolume
// Ignore not found errors as they cannot be fixed by a requeue
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type ReconcileReplica struct {
var _ reconcile.Reconciler = &ReconcileReplica{}

func (r *ReconcileReplica) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
if !r.controllerSharedState.isRecoveryComplete() {
return reconcile.Result{Requeue: true}, nil
}

azVolumeAttachment, err := azureutils.GetAzVolumeAttachment(ctx, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, request.Name, request.Namespace, true)
if errors.IsNotFound(err) {
klog.Infof("AzVolumeAttachment (%s) has been successfully deleted.", request.Name)
Expand Down

0 comments on commit b13ee46

Please sign in to comment.