Skip to content

Commit

Permalink
update Scheduled conditon when failed scheduling
Browse files Browse the repository at this point in the history
Signed-off-by: lihanbo <[email protected]>
  • Loading branch information
mrlihanbo committed Dec 1, 2021
1 parent 6dfa010 commit cb7f351
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 85 deletions.
11 changes: 5 additions & 6 deletions pkg/controllers/binding/binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,6 @@ func (c *ResourceBindingController) Reconcile(ctx context.Context, req controlle
return c.removeFinalizer(binding)
}

isReady := helper.IsBindingReady(&binding.Status)
if !isReady {
klog.Infof("ResourceBinding(%s/%s) is not ready to sync", binding.GetNamespace(), binding.GetName())
return controllerruntime.Result{}, nil
}

return c.syncBinding(binding)
}

Expand Down Expand Up @@ -111,6 +105,11 @@ func (c *ResourceBindingController) syncBinding(binding *workv1alpha2.ResourceBi
return controllerruntime.Result{Requeue: true}, err
}

// no need to dispatch works if no clusters scheduled.
if len(clusterNames) == 0 {
return controllerruntime.Result{}, nil
}

workload, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource)
if err != nil {
klog.Errorf("Failed to fetch workload for resourceBinding(%s/%s). Error: %v.",
Expand Down
11 changes: 5 additions & 6 deletions pkg/controllers/binding/cluster_resource_binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,6 @@ func (c *ClusterResourceBindingController) Reconcile(ctx context.Context, req co
return c.removeFinalizer(clusterResourceBinding)
}

isReady := helper.IsBindingReady(&clusterResourceBinding.Status)
if !isReady {
klog.Infof("ClusterResourceBinding %s is not ready to sync", clusterResourceBinding.GetName())
return controllerruntime.Result{}, nil
}

return c.syncBinding(clusterResourceBinding)
}

Expand Down Expand Up @@ -109,6 +103,11 @@ func (c *ClusterResourceBindingController) syncBinding(binding *workv1alpha2.Clu
return controllerruntime.Result{Requeue: true}, err
}

// no need to dispatch works if no clusters scheduled.
if len(clusterNames) == 0 {
return controllerruntime.Result{}, nil
}

workload, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource)
if err != nil {
klog.Errorf("Failed to fetch workload for clusterResourceBinding(%s). Error: %v.", binding.GetName(), err)
Expand Down
167 changes: 97 additions & 70 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

Expand All @@ -37,16 +38,12 @@ import (
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/tainttoleration"
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
)

// ScheduleType defines the schedule type of a binding object should be performed.
type ScheduleType string

const (
// FirstSchedule means the binding object hasn't been scheduled.
FirstSchedule ScheduleType = "FirstSchedule"

// ReconcileSchedule means the binding object associated policy has been changed.
ReconcileSchedule ScheduleType = "ReconcileSchedule"

Expand All @@ -58,8 +55,8 @@ const (
)

const (
scheduleSuccessReason = "BindingScheduled"

scheduleSuccessReason = "BindingScheduled"
scheduleFailedReason = "BindingFailedScheduling"
scheduleSuccessMessage = "the binding has been scheduled"
)

Expand Down Expand Up @@ -398,22 +395,32 @@ func (s *Scheduler) doScheduleBinding(namespace, name string) error {
return err
}

// Update "Scheduled" condition according to schedule result.
defer func() {
var condition metav1.Condition
if err == nil {
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleSuccessReason, scheduleSuccessMessage, metav1.ConditionTrue)
} else {
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleFailedReason, err.Error(), metav1.ConditionFalse)
}
if updateErr := s.updateBindingScheduledConditionIfNeeded(rb, condition); updateErr != nil {
klog.Errorf("Failed update condition(%s) for ResourceBinding(%s/%s)", workv1alpha2.Scheduled, rb.Namespace, rb.Name)
if err == nil {
// schedule succeed but update condition failed, return err in order to retry in next loop.
err = updateErr
}
}
}()

start := time.Now()
if !helper.IsBindingReady(&rb.Status) {
// the binding has not been scheduled, need schedule
klog.Infof("Start scheduling ResourceBinding(%s/%s)", namespace, name)
err = s.scheduleResourceBinding(rb)
metrics.BindingSchedule(string(FirstSchedule), metrics.SinceInSeconds(start), err)
return err
}
policyPlacement, policyPlacementStr, err := s.getPlacement(rb)
if err != nil {
return err
}
appliedPlacement := util.GetLabelValue(rb.Annotations, util.PolicyPlacementAnnotation)
if policyPlacementStr != appliedPlacement {
// policy placement changed, need reschedule
klog.Infof("Reschedule ResourceBinding(%s/%s) as placement changed", namespace, name)
// policy placement changed, need schedule
klog.Infof("Start to schedule ResourceBinding(%s/%s) as placement changed", namespace, name)
err = s.scheduleResourceBinding(rb)
metrics.BindingSchedule(string(ReconcileSchedule), metrics.SinceInSeconds(start), err)
return err
Expand Down Expand Up @@ -449,22 +456,32 @@ func (s *Scheduler) doScheduleClusterBinding(name string) error {
return err
}

// Update "Scheduled" condition according to schedule result.
defer func() {
var condition metav1.Condition
if err == nil {
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleSuccessReason, scheduleSuccessMessage, metav1.ConditionTrue)
} else {
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleFailedReason, err.Error(), metav1.ConditionFalse)
}
if updateErr := s.updateClusterBindingScheduledConditionIfNeeded(crb, condition); updateErr != nil {
klog.Errorf("Failed update condition(%s) for ClusterResourceBinding(%s)", workv1alpha2.Scheduled, crb.Name)
if err == nil {
// schedule succeed but update condition failed, return err in order to retry in next loop.
err = updateErr
}
}
}()

start := time.Now()
if !helper.IsBindingReady(&crb.Status) {
// the binding has not been scheduled, need schedule
klog.Infof("Start scheduling ClusterResourceBinding(%s)", name)
err = s.scheduleClusterResourceBinding(crb)
metrics.BindingSchedule(string(FirstSchedule), metrics.SinceInSeconds(start), err)
return err
}
policyPlacement, policyPlacementStr, err := s.getClusterPlacement(crb)
if err != nil {
return err
}
appliedPlacement := util.GetLabelValue(crb.Annotations, util.PolicyPlacementAnnotation)
if policyPlacementStr != appliedPlacement {
// policy placement changed, need reschedule
klog.Infof("Reschedule ClusterResourceBinding(%s) as placement changed", name)
// policy placement changed, need schedule
klog.Infof("Start to schedule ClusterResourceBinding(%s) as placement changed", name)
err = s.scheduleClusterResourceBinding(crb)
metrics.BindingSchedule(string(ReconcileSchedule), metrics.SinceInSeconds(start), err)
return err
Expand Down Expand Up @@ -513,11 +530,12 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha2.Resour
}
binding.Annotations[util.PolicyPlacementAnnotation] = placementStr

binding, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
if err != nil {
return err
}
return s.updateBindingStatusIfNeeded(binding)

return nil
}

func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding) (err error) {
Expand Down Expand Up @@ -550,11 +568,12 @@ func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv
}
binding.Annotations[util.PolicyPlacementAnnotation] = string(placement)

binding, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
if err != nil {
return err
}
return s.updateClusterBindingStatusIfNeeded(binding)

return nil
}

func (s *Scheduler) handleErr(err error, key interface{}) {
Expand Down Expand Up @@ -690,11 +709,12 @@ func (s *Scheduler) rescheduleClusterResourceBinding(clusterResourceBinding *wor
clusterResourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters
klog.Infof("The final binding.Spec.Cluster values are: %v\n", clusterResourceBinding.Spec.Clusters)

clusterResourceBinding, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), clusterResourceBinding, metav1.UpdateOptions{})
_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), clusterResourceBinding, metav1.UpdateOptions{})
if err != nil {
return err
}
return s.updateClusterBindingStatusIfNeeded(clusterResourceBinding)

return nil
}

func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha2.ResourceBinding) error {
Expand All @@ -717,11 +737,12 @@ func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha2.Reso
resourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters
klog.Infof("The final binding.Spec.Cluster values are: %v\n", resourceBinding.Spec.Clusters)

resourceBinding, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(resourceBinding.Namespace).Update(context.TODO(), resourceBinding, metav1.UpdateOptions{})
_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(resourceBinding.Namespace).Update(context.TODO(), resourceBinding, metav1.UpdateOptions{})
if err != nil {
return err
}
return s.updateBindingStatusIfNeeded(resourceBinding)

return nil
}

func (s *Scheduler) scaleScheduleResourceBinding(resourceBinding *workv1alpha2.ResourceBinding) (err error) {
Expand All @@ -748,11 +769,12 @@ func (s *Scheduler) scaleScheduleResourceBinding(resourceBinding *workv1alpha2.R
}
binding.Annotations[util.PolicyPlacementAnnotation] = placementStr

binding, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
if err != nil {
return err
}
return s.updateBindingStatusIfNeeded(binding)

return nil
}

func (s *Scheduler) scaleScheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding) (err error) {
Expand Down Expand Up @@ -786,11 +808,12 @@ func (s *Scheduler) scaleScheduleClusterResourceBinding(clusterResourceBinding *
}
binding.Annotations[util.PolicyPlacementAnnotation] = string(placement)

binding, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
if err != nil {
return err
}
return s.updateClusterBindingStatusIfNeeded(binding)

return nil
}

func (s *Scheduler) allClustersInReadyState(tcs []workv1alpha2.TargetCluster) bool {
Expand Down Expand Up @@ -838,46 +861,50 @@ func (s *Scheduler) establishEstimatorConnections() {
}
}

// updateBindingStatusIfNeeded sets the scheduled condition of ResourceBinding to true if needed
func (s *Scheduler) updateBindingStatusIfNeeded(rb *workv1alpha2.ResourceBinding) error {
oldScheduledCondition := meta.FindStatusCondition(rb.Status.Conditions, workv1alpha2.Scheduled)
newScheduledCondition := metav1.Condition{
Type: workv1alpha2.Scheduled,
Status: metav1.ConditionTrue,
Reason: scheduleSuccessReason,
Message: scheduleSuccessMessage,
}
if equality.Semantic.DeepEqual(oldScheduledCondition, newScheduledCondition) {
// updateBindingScheduledConditionIfNeeded sets the scheduled condition of ResourceBinding if needed
func (s *Scheduler) updateBindingScheduledConditionIfNeeded(rb *workv1alpha2.ResourceBinding, newScheduledCondition metav1.Condition) error {
if rb == nil || meta.IsStatusConditionPresentAndEqual(rb.Status.Conditions, workv1alpha2.Scheduled, newScheduledCondition.Status) {
return nil
}

meta.SetStatusCondition(&rb.Status.Conditions, newScheduledCondition)
_, err := s.KarmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).UpdateStatus(context.TODO(), rb, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update ResourceBinding status(%s/%s): %v", rb.Namespace, rb.Name, err)
return err
}
return nil
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
meta.SetStatusCondition(&rb.Status.Conditions, newScheduledCondition)
_, updateErr := s.KarmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).UpdateStatus(context.TODO(), rb, metav1.UpdateOptions{})
if updateErr == nil {
return nil
}

if updated, err := s.KarmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).Get(context.TODO(), rb.Name, metav1.GetOptions{}); err == nil {
// make a copy so we don't mutate the shared cache
rb = updated.DeepCopy()
} else {
klog.Errorf("failed to get updated resource binding %s/%s: %v", rb.Namespace, rb.Name, err)
}

return updateErr
})
}

// updateClusterBindingStatusIfNeeded sets the scheduled condition of ClusterResourceBinding to true if needed
func (s *Scheduler) updateClusterBindingStatusIfNeeded(crb *workv1alpha2.ClusterResourceBinding) error {
oldScheduledCondition := meta.FindStatusCondition(crb.Status.Conditions, workv1alpha2.Scheduled)
newScheduledCondition := metav1.Condition{
Type: workv1alpha2.Scheduled,
Status: metav1.ConditionTrue,
Reason: scheduleSuccessReason,
Message: scheduleSuccessMessage,
}
if equality.Semantic.DeepEqual(oldScheduledCondition, newScheduledCondition) {
// updateClusterBindingScheduledConditionIfNeeded sets the scheduled condition of ClusterResourceBinding if needed
func (s *Scheduler) updateClusterBindingScheduledConditionIfNeeded(crb *workv1alpha2.ClusterResourceBinding, newScheduledCondition metav1.Condition) error {
if crb == nil || meta.IsStatusConditionPresentAndEqual(crb.Status.Conditions, workv1alpha2.Scheduled, newScheduledCondition.Status) {
return nil
}

meta.SetStatusCondition(&crb.Status.Conditions, newScheduledCondition)
_, err := s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().UpdateStatus(context.TODO(), crb, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update ClusterResourceBinding status(%s): %v", crb.Name, err)
return err
}
return nil
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
meta.SetStatusCondition(&crb.Status.Conditions, newScheduledCondition)
_, updateErr := s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().UpdateStatus(context.TODO(), crb, metav1.UpdateOptions{})
if updateErr == nil {
return nil
}

if updated, err := s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Get(context.TODO(), crb.Name, metav1.GetOptions{}); err == nil {
// make a copy so we don't mutate the shared cache
crb = updated.DeepCopy()
} else {
klog.Errorf("failed to get updated cluster resource binding %s: %v", crb.Name, err)
}

return updateErr
})
}
4 changes: 2 additions & 2 deletions pkg/util/helper/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func SortClusterByWeight(m map[string]int64) ClusterWeightInfoList {
return p
}

// IsBindingReady will check if resourceBinding/clusterResourceBinding is ready to build Work.
func IsBindingReady(status *workv1alpha2.ResourceBindingStatus) bool {
// IsBindingScheduled will check if resourceBinding/clusterResourceBinding is successfully scheduled.
func IsBindingScheduled(status *workv1alpha2.ResourceBindingStatus) bool {
return meta.IsStatusConditionTrue(status.Conditions, workv1alpha2.Scheduled)
}

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/framework/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func ExtractTargetClustersFrom(c client.Client, deployment *appsv1.Deployment) [
err := c.Get(context.TODO(), client.ObjectKey{Namespace: deployment.Namespace, Name: bindingName}, binding)
g.Expect(err).NotTo(gomega.HaveOccurred())

if !helper.IsBindingReady(&binding.Status) {
if !helper.IsBindingScheduled(&binding.Status) {
klog.Infof("The ResourceBinding(%s/%s) hasn't been scheduled.", binding.Namespace, binding.Name)
return false, nil
}
Expand Down

0 comments on commit cb7f351

Please sign in to comment.