diff --git a/pkg/apis/work/v1alpha2/binding_types.go b/pkg/apis/work/v1alpha2/binding_types.go index 10cd0bdf910f..ca292ab4a82c 100644 --- a/pkg/apis/work/v1alpha2/binding_types.go +++ b/pkg/apis/work/v1alpha2/binding_types.go @@ -135,6 +135,12 @@ type AggregatedStatusItem struct { AppliedMessage string `json:"appliedMessage,omitempty"` } +// Conditions definition +const ( + // Scheduled represents the condition that the ResourceBinding or ClusterResourceBinding has been scheduled. + Scheduled string = "Scheduled" +) + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // ResourceBindingList contains a list of ResourceBinding. diff --git a/pkg/controllers/binding/binding_controller.go b/pkg/controllers/binding/binding_controller.go index cd069df99901..cbae87f5b4b0 100644 --- a/pkg/controllers/binding/binding_controller.go +++ b/pkg/controllers/binding/binding_controller.go @@ -65,7 +65,7 @@ func (c *ResourceBindingController) Reconcile(ctx context.Context, req controlle return c.removeFinalizer(binding) } - isReady := helper.IsBindingReady(binding.Spec.Clusters) + isReady := helper.IsBindingReady(&binding.Status) if !isReady { klog.Infof("ResourceBinding(%s/%s) is not ready to sync", binding.GetNamespace(), binding.GetName()) return controllerruntime.Result{}, nil diff --git a/pkg/controllers/binding/cluster_resource_binding_controller.go b/pkg/controllers/binding/cluster_resource_binding_controller.go index d28fc6e8c5f8..1054ae6401bf 100644 --- a/pkg/controllers/binding/cluster_resource_binding_controller.go +++ b/pkg/controllers/binding/cluster_resource_binding_controller.go @@ -65,7 +65,7 @@ func (c *ClusterResourceBindingController) Reconcile(ctx context.Context, req co return c.removeFinalizer(clusterResourceBinding) } - isReady := helper.IsBindingReady(clusterResourceBinding.Spec.Clusters) + isReady := helper.IsBindingReady(&clusterResourceBinding.Status) if !isReady { klog.Infof("ClusterResourceBinding %s is not ready to sync", clusterResourceBinding.GetName()) return controllerruntime.Result{}, nil diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index d6d1d3d6d83d..53d7b63e9b32 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -61,6 +61,12 @@ const ( Unknown ScheduleType = "Unknown" ) +const ( + scheduleSuccessReason = "BindingScheduled" + + scheduleSuccessMessage = "the binding has been scheduled" +) + // Failover indicates if the scheduler should performs re-scheduler in case of cluster failure. // TODO(RainbowMango): Remove the temporary solution by introducing feature flag var Failover bool @@ -472,11 +478,11 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha2.Resour } binding.Annotations[util.PolicyPlacementAnnotation] = placementStr - _, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{}) + binding, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{}) if err != nil { return err } - return nil + return s.updateBindingStatusIfNeeded(binding) } func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding, policy *policyv1alpha1.ClusterPropagationPolicy) (err error) { @@ -501,11 +507,11 @@ func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv } binding.Annotations[util.PolicyPlacementAnnotation] = string(placement) - _, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{}) + binding, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{}) if err != nil { return err } - return nil + return s.updateClusterBindingStatusIfNeeded(binding) } func (s *Scheduler) handleErr(err error, key interface{}) { @@ -674,11 +680,11 @@ func (s *Scheduler) rescheduleClusterResourceBinding(clusterResourceBinding *wor clusterResourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters klog.Infof("The final binding.Spec.Cluster values are: %v\n", clusterResourceBinding.Spec.Clusters) - _, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), clusterResourceBinding, metav1.UpdateOptions{}) + clusterResourceBinding, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), clusterResourceBinding, metav1.UpdateOptions{}) if err != nil { return err } - return nil + return s.updateClusterBindingStatusIfNeeded(clusterResourceBinding) } func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha2.ResourceBinding) error { @@ -699,11 +705,11 @@ func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha2.Reso resourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters klog.Infof("The final binding.Spec.Cluster values are: %v\n", resourceBinding.Spec.Clusters) - _, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(resourceBinding.Namespace).Update(context.TODO(), resourceBinding, metav1.UpdateOptions{}) + resourceBinding, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(resourceBinding.Namespace).Update(context.TODO(), resourceBinding, metav1.UpdateOptions{}) if err != nil { return err } - return nil + return s.updateBindingStatusIfNeeded(resourceBinding) } func (s *Scheduler) scaleScheduleOne(key string) (err error) { @@ -761,11 +767,11 @@ func (s *Scheduler) scaleScheduleResourceBinding(resourceBinding *workv1alpha2.R } binding.Annotations[util.PolicyPlacementAnnotation] = placementStr - _, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{}) + binding, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{}) if err != nil { return err } - return nil + return s.updateBindingStatusIfNeeded(binding) } func (s *Scheduler) scaleScheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding, @@ -792,11 +798,11 @@ func (s *Scheduler) scaleScheduleClusterResourceBinding(clusterResourceBinding * } binding.Annotations[util.PolicyPlacementAnnotation] = string(placement) - _, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{}) + binding, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{}) if err != nil { return err } - return nil + return s.updateClusterBindingStatusIfNeeded(binding) } func (s *Scheduler) getTypeFromResourceBindings(ns, name string) ScheduleType { @@ -903,3 +909,47 @@ func (s *Scheduler) establishEstimatorConnections() { } } } + +// updateBindingStatusIfNeeded sets the scheduled condition of ResourceBinding to true if needed +func (s *Scheduler) updateBindingStatusIfNeeded(rb *workv1alpha2.ResourceBinding) error { + oldScheduledCondition := meta.FindStatusCondition(rb.Status.Conditions, workv1alpha2.Scheduled) + newScheduledCondition := metav1.Condition{ + Type: workv1alpha2.Scheduled, + Status: metav1.ConditionTrue, + Reason: scheduleSuccessReason, + Message: scheduleSuccessMessage, + } + if equality.Semantic.DeepEqual(oldScheduledCondition, newScheduledCondition) { + return nil + } + + meta.SetStatusCondition(&rb.Status.Conditions, newScheduledCondition) + _, err := s.KarmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).UpdateStatus(context.TODO(), rb, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("Failed to update ResourceBinding status(%s/%s): %v", rb.Namespace, rb.Name, err) + return err + } + return nil +} + +// updateClusterBindingStatusIfNeeded sets the scheduled condition of ClusterResourceBinding to true if needed +func (s *Scheduler) updateClusterBindingStatusIfNeeded(crb *workv1alpha2.ClusterResourceBinding) error { + oldScheduledCondition := meta.FindStatusCondition(crb.Status.Conditions, workv1alpha2.Scheduled) + newScheduledCondition := metav1.Condition{ + Type: workv1alpha2.Scheduled, + Status: metav1.ConditionTrue, + Reason: scheduleSuccessReason, + Message: scheduleSuccessMessage, + } + if equality.Semantic.DeepEqual(oldScheduledCondition, newScheduledCondition) { + return nil + } + + meta.SetStatusCondition(&crb.Status.Conditions, newScheduledCondition) + _, err := s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().UpdateStatus(context.TODO(), crb, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("Failed to update ClusterResourceBinding status(%s): %v", crb.Name, err) + return err + } + return nil +} diff --git a/pkg/util/helper/binding.go b/pkg/util/helper/binding.go index ed279496480b..2c26f616bfa4 100644 --- a/pkg/util/helper/binding.go +++ b/pkg/util/helper/binding.go @@ -55,8 +55,8 @@ func SortClusterByWeight(m map[string]int64) ClusterWeightInfoList { } // IsBindingReady will check if resourceBinding/clusterResourceBinding is ready to build Work. -func IsBindingReady(targetClusters []workv1alpha2.TargetCluster) bool { - return len(targetClusters) != 0 +func IsBindingReady(status *workv1alpha2.ResourceBindingStatus) bool { + return meta.IsStatusConditionTrue(status.Conditions, workv1alpha2.Scheduled) } // HasScheduledReplica checks if the scheduler has assigned replicas for each cluster. diff --git a/test/e2e/failover_test.go b/test/e2e/failover_test.go index a62864bb6f38..4bf847763a96 100644 --- a/test/e2e/failover_test.go +++ b/test/e2e/failover_test.go @@ -152,6 +152,15 @@ var _ = ginkgo.Describe("failover testing", func() { fmt.Printf("reschedule in %d target cluster\n", totalNum) }) + ginkgo.By("check if the scheduled condition is true", func() { + err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) { + rb, err := getResourceBinding(deployment) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + return meta.IsStatusConditionTrue(rb.Status.Conditions, workv1alpha2.Scheduled), nil + }) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + }) + ginkgo.By("recover not ready cluster", func() { for _, disabledCluster := range disabledClusters { fmt.Printf("cluster %s is waiting for recovering\n", disabledCluster.Name) diff --git a/test/e2e/scheduling_test.go b/test/e2e/scheduling_test.go index 56121715baef..5986a9428127 100644 --- a/test/e2e/scheduling_test.go +++ b/test/e2e/scheduling_test.go @@ -9,6 +9,7 @@ import ( "github.com/onsi/gomega" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -92,6 +93,15 @@ var _ = ginkgo.Describe("propagation with label and group constraints testing", gomega.Expect(len(targetClusterNames) == minGroups).ShouldNot(gomega.BeFalse()) }) + ginkgo.By("check if the scheduled condition is true", func() { + err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) { + rb, err := getResourceBinding(deployment) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + return meta.IsStatusConditionTrue(rb.Status.Conditions, workv1alpha2.Scheduled), nil + }) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + }) + ginkgo.By("check if deployment present on right clusters", func() { for _, targetClusterName := range targetClusterNames { clusterClient := getClusterClient(targetClusterName) @@ -266,6 +276,15 @@ var _ = ginkgo.Describe("propagation with label and group constraints testing", fmt.Printf("target clusters in cluster resource binding are %s\n", targetClusterNames) }) + ginkgo.By("check if the scheduled condition is true", func() { + err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) { + crb, err := getClusterResourceBinding(crd) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + return meta.IsStatusConditionTrue(crb.Status.Conditions, workv1alpha2.Scheduled), nil + }) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + }) + ginkgo.By("check if crd present on right clusters", func() { for _, targetClusterName := range targetClusterNames { clusterDynamicClient := getClusterDynamicClient(targetClusterName) @@ -864,3 +883,27 @@ var _ = ginkgo.Describe("[ReplicaScheduling] ReplicaSchedulingStrategy testing", }) }) }) + +// get the resource binding associated with the workload +func getResourceBinding(workload interface{}) (*workv1alpha2.ResourceBinding, error) { + uncastObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(workload) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + obj := unstructured.Unstructured{Object: uncastObj} + bindingName := names.GenerateBindingName(obj.GetKind(), obj.GetName()) + binding := &workv1alpha2.ResourceBinding{} + + err = controlPlaneClient.Get(context.TODO(), client.ObjectKey{Namespace: obj.GetNamespace(), Name: bindingName}, binding) + return binding, err +} + +// get the cluster resource binding associated with the workload +func getClusterResourceBinding(workload interface{}) (*workv1alpha2.ClusterResourceBinding, error) { + uncastObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(workload) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + obj := unstructured.Unstructured{Object: uncastObj} + bindingName := names.GenerateBindingName(obj.GetKind(), obj.GetName()) + binding := &workv1alpha2.ClusterResourceBinding{} + + err = controlPlaneClient.Get(context.TODO(), client.ObjectKey{Name: bindingName}, binding) + return binding, err +}