Skip to content

Commit

Permalink
use waiter for triggerCreateFailedReplica
Browse files Browse the repository at this point in the history
  • Loading branch information
sunpa93 committed May 25, 2022
1 parent af50295 commit 7490690
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 22 deletions.
12 changes: 9 additions & 3 deletions pkg/controller/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1214,12 +1214,15 @@ func (s *scoreByNodeCapacity) score(ctx context.Context, nodeScores map[string]i
// if failed to get node's remaining capacity, remove the node from the candidate list and proceed
w.Logger().Errorf(err, "failed to get remaining capacity of node (%s)", nodeName)
delete(nodeScores, nodeName)
continue
}
if remainingCapacity-len(s.volumes) <= 0 {

nodeScores[nodeName] = score + (nodeScoreLowCoefficient * remainingCapacity)

if remainingCapacity-len(s.volumes) < 0 {
delete(nodeScores, nodeName)
}

nodeScores[nodeName] = score + (nodeScoreLowCoefficient * remainingCapacity)
w.Logger().V(5).Infof("node (%s) can accept %d more attachments", nodeName, remainingCapacity)
}
return nodeScores, nil
Expand Down Expand Up @@ -1363,17 +1366,19 @@ func (c *SharedState) prioritizeNodes(ctx context.Context, pods []v1.Pod, volume
}

// normalize score
numFiltered := 0
for _, node := range nodes {
if _, exists := nodeScores[node.Name]; !exists {
nodeScores[node.Name] = -1
numFiltered++
}
}

sort.Slice(nodes[:], func(i, j int) bool {
return nodeScores[nodes[i].Name] > nodeScores[nodes[j].Name]
})

return nodes
return nodes[:len(nodes)-numFiltered]
}

func (c *SharedState) filterAndSortNodes(ctx context.Context, nodes []v1.Node, pods []v1.Pod, volumes []string) ([]v1.Node, error) {
Expand Down Expand Up @@ -1973,6 +1978,7 @@ func (vq *VolumeReplicaRequestsPriorityQueue) Push(ctx context.Context, replicaR

func (vq *VolumeReplicaRequestsPriorityQueue) Pop() *ReplicaRequest {
request, _ := vq.queue.Pop()
vq.queue.Pop()
atomic.AddInt32(&vq.size, -1)
return request.(*ReplicaRequest)
}
Expand Down
65 changes: 46 additions & 19 deletions pkg/controller/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package controller
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/go-logr/logr"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/util/wait"
diskv1beta1 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta1"
consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
"sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
Expand All @@ -41,10 +44,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
)

const (
deletionPollingInterval = time.Duration(10) * time.Second
)

type ReconcileReplica struct {
logger logr.Logger
controllerSharedState *SharedState
Expand Down Expand Up @@ -91,14 +90,18 @@ func (r *ReconcileReplica) Reconcile(ctx context.Context, request reconcile.Requ
}
// create a replacement replica if replica attachment failed
if objectDeletionRequested(azVolumeAttachment) {
if azVolumeAttachment.Status.State == diskv1beta1.DetachmentFailed {
switch azVolumeAttachment.Status.State {
case diskv1beta1.Detaching:
case diskv1beta1.DetachmentFailed:
if err := azureutils.UpdateCRIWithRetry(ctx, nil, r.controllerSharedState.cachedClient, r.controllerSharedState.azClient, azVolumeAttachment, func(obj interface{}) error {
azVolumeAttachment := obj.(*diskv1beta1.AzVolumeAttachment)
_, err = updateState(azVolumeAttachment, diskv1beta1.ForceDetachPending, normalUpdate)
return err
}, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
return reconcile.Result{Requeue: true}, err
}
default:
return reconcile.Result{Requeue: true}, err
}
if !isCleanupRequested(azVolumeAttachment) || !volumeDetachRequested(azVolumeAttachment) {
go func() {
Expand Down Expand Up @@ -174,22 +177,46 @@ func (r *ReconcileReplica) triggerCreateFailedReplicas(ctx context.Context, volu
}
labelSelector := labels.NewSelector().Add(*volRequirement)
listOptions := client.ListOptions{LabelSelector: labelSelector}
err = wait.PollImmediateWithContext(ctx, deletionPollingInterval, 10*time.Minute, func(ctx context.Context) (bool, error) {
azVolumeAttachmentList := &diskv1beta1.AzVolumeAttachmentList{}
err := r.controllerSharedState.cachedClient.List(ctx, azVolumeAttachmentList, &listOptions)
if err != nil {
if errors.IsNotFound(err) {
return true, nil
azVolumeAttachmentList := &diskv1beta1.AzVolumeAttachmentList{}
if err = r.controllerSharedState.cachedClient.List(ctx, azVolumeAttachmentList, &listOptions); errors.IsNotFound(err) {
return
}

var wg sync.WaitGroup
var numErr uint32
errs := make([]error, len(azVolumeAttachmentList.Items))
for i := range azVolumeAttachmentList.Items {
wg.Add(1)
go func(index int) {
var err error
defer wg.Done()
defer func() {
if err != nil {
_ = atomic.AddUint32(&numErr, 1)
}
}()

var waiter *watcher.ConditionWaiter
waiter, err = r.controllerSharedState.conditionWatcher.NewConditionWaiter(ctx, watcher.AzVolumeAttachmentType, azVolumeAttachmentList.Items[index].Name, verifyObjectDeleted)
if err != nil {
errs[index] = err
return
}
w.Logger().Errorf(err, "Failed to get AzVolumeAttachments.")
return false, err
}
return len(azVolumeAttachmentList.Items) == 0, nil
})
if err != nil {
w.Logger().Errorf(err, "Failed polling for AzVolumeAttachments to be zero length.")
defer waiter.Close()
if _, err = waiter.Wait(ctx); err != nil {
errs[index] = err
}
}(i)
}
// wait for all AzVolumeAttachments to be deleted
wg.Wait()

if numErr > 0 {
err := status.Errorf(codes.Internal, "%+v", errs)
w.Logger().Error(err, "failed to wait for replica AzVolumeAttachments cleanup.")
return
}

r.controllerSharedState.tryCreateFailedReplicas(ctx, "replicaController")
}

Expand Down
1 change: 1 addition & 0 deletions pkg/controller/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func TestReplicaReconcile(t *testing.T) {
replicaAttachment := testReplicaAzVolumeAttachment
now := metav1.Time{Time: metav1.Now().Add(-1000)}
replicaAttachment.DeletionTimestamp = &now
replicaAttachment.Status.State = diskv1beta1.Detaching

newVolume := testAzVolume0.DeepCopy()
newVolume.Status.Detail = &diskv1beta1.AzVolumeStatusDetail{
Expand Down

0 comments on commit 7490690

Please sign in to comment.