Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propose a Scheduled condition for RB/CRB #823

Merged
merged 2 commits into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/apis/work/v1alpha2/binding_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ type AggregatedStatusItem struct {
AppliedMessage string `json:"appliedMessage,omitempty"`
}

// Conditions definition
const (
// Scheduled represents the condition that the ResourceBinding or ClusterResourceBinding has been scheduled.
Scheduled string = "Scheduled"
)

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// ResourceBindingList contains a list of ResourceBinding.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/binding/binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (c *ResourceBindingController) Reconcile(ctx context.Context, req controlle
return c.removeFinalizer(binding)
}

isReady := helper.IsBindingReady(binding.Spec.Clusters)
isReady := helper.IsBindingReady(&binding.Status)
if !isReady {
klog.Infof("ResourceBinding(%s/%s) is not ready to sync", binding.GetNamespace(), binding.GetName())
return controllerruntime.Result{}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (c *ClusterResourceBindingController) Reconcile(ctx context.Context, req co
return c.removeFinalizer(clusterResourceBinding)
}

isReady := helper.IsBindingReady(clusterResourceBinding.Spec.Clusters)
isReady := helper.IsBindingReady(&clusterResourceBinding.Status)
if !isReady {
klog.Infof("ClusterResourceBinding %s is not ready to sync", clusterResourceBinding.GetName())
return controllerruntime.Result{}, nil
Expand Down
74 changes: 62 additions & 12 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ const (
Unknown ScheduleType = "Unknown"
)

const (
scheduleSuccessReason = "BindingScheduled"

scheduleSuccessMessage = "the binding has been scheduled"
)

// Failover indicates if the scheduler should performs re-scheduler in case of cluster failure.
// TODO(RainbowMango): Remove the temporary solution by introducing feature flag
var Failover bool
Expand Down Expand Up @@ -472,11 +478,11 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha2.Resour
}
binding.Annotations[util.PolicyPlacementAnnotation] = placementStr

_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
binding, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
return s.updateBindingStatusIfNeeded(binding)
}

func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding, policy *policyv1alpha1.ClusterPropagationPolicy) (err error) {
Expand All @@ -501,11 +507,11 @@ func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv
}
binding.Annotations[util.PolicyPlacementAnnotation] = string(placement)

_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
binding, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
return s.updateClusterBindingStatusIfNeeded(binding)
}

func (s *Scheduler) handleErr(err error, key interface{}) {
Expand Down Expand Up @@ -674,11 +680,11 @@ func (s *Scheduler) rescheduleClusterResourceBinding(clusterResourceBinding *wor
clusterResourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters
klog.Infof("The final binding.Spec.Cluster values are: %v\n", clusterResourceBinding.Spec.Clusters)

_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), clusterResourceBinding, metav1.UpdateOptions{})
clusterResourceBinding, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), clusterResourceBinding, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
return s.updateClusterBindingStatusIfNeeded(clusterResourceBinding)
}

func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha2.ResourceBinding) error {
Expand All @@ -699,11 +705,11 @@ func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha2.Reso
resourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters
klog.Infof("The final binding.Spec.Cluster values are: %v\n", resourceBinding.Spec.Clusters)

_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(resourceBinding.Namespace).Update(context.TODO(), resourceBinding, metav1.UpdateOptions{})
resourceBinding, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(resourceBinding.Namespace).Update(context.TODO(), resourceBinding, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
return s.updateBindingStatusIfNeeded(resourceBinding)
}

func (s *Scheduler) scaleScheduleOne(key string) (err error) {
Expand Down Expand Up @@ -761,11 +767,11 @@ func (s *Scheduler) scaleScheduleResourceBinding(resourceBinding *workv1alpha2.R
}
binding.Annotations[util.PolicyPlacementAnnotation] = placementStr

_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
binding, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
return s.updateBindingStatusIfNeeded(binding)
}

func (s *Scheduler) scaleScheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding,
Expand All @@ -792,11 +798,11 @@ func (s *Scheduler) scaleScheduleClusterResourceBinding(clusterResourceBinding *
}
binding.Annotations[util.PolicyPlacementAnnotation] = string(placement)

_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
binding, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
return s.updateClusterBindingStatusIfNeeded(binding)
}

func (s *Scheduler) getTypeFromResourceBindings(ns, name string) ScheduleType {
Expand Down Expand Up @@ -903,3 +909,47 @@ func (s *Scheduler) establishEstimatorConnections() {
}
}
}

// updateBindingStatusIfNeeded sets the scheduled condition of ResourceBinding to true if needed
func (s *Scheduler) updateBindingStatusIfNeeded(rb *workv1alpha2.ResourceBinding) error {
oldScheduledCondition := meta.FindStatusCondition(rb.Status.Conditions, workv1alpha2.Scheduled)
newScheduledCondition := metav1.Condition{
Type: workv1alpha2.Scheduled,
Status: metav1.ConditionTrue,
Reason: scheduleSuccessReason,
Message: scheduleSuccessMessage,
}
if equality.Semantic.DeepEqual(oldScheduledCondition, newScheduledCondition) {
return nil
}

meta.SetStatusCondition(&rb.Status.Conditions, newScheduledCondition)
_, err := s.KarmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).UpdateStatus(context.TODO(), rb, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update ResourceBinding status(%s/%s): %v", rb.Namespace, rb.Name, err)
return err
dddddai marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

// updateClusterBindingStatusIfNeeded sets the scheduled condition of ClusterResourceBinding to true if needed
func (s *Scheduler) updateClusterBindingStatusIfNeeded(crb *workv1alpha2.ClusterResourceBinding) error {
oldScheduledCondition := meta.FindStatusCondition(crb.Status.Conditions, workv1alpha2.Scheduled)
newScheduledCondition := metav1.Condition{
Type: workv1alpha2.Scheduled,
Status: metav1.ConditionTrue,
Reason: scheduleSuccessReason,
Message: scheduleSuccessMessage,
}
if equality.Semantic.DeepEqual(oldScheduledCondition, newScheduledCondition) {
return nil
}

meta.SetStatusCondition(&crb.Status.Conditions, newScheduledCondition)
_, err := s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().UpdateStatus(context.TODO(), crb, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update ClusterResourceBinding status(%s): %v", crb.Name, err)
return err
dddddai marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}
4 changes: 2 additions & 2 deletions pkg/util/helper/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func SortClusterByWeight(m map[string]int64) ClusterWeightInfoList {
}

// IsBindingReady will check if resourceBinding/clusterResourceBinding is ready to build Work.
func IsBindingReady(targetClusters []workv1alpha2.TargetCluster) bool {
return len(targetClusters) != 0
func IsBindingReady(status *workv1alpha2.ResourceBindingStatus) bool {
return meta.IsStatusConditionTrue(status.Conditions, workv1alpha2.Scheduled)
}

// HasScheduledReplica checks if the scheduler has assigned replicas for each cluster.
Expand Down
9 changes: 9 additions & 0 deletions test/e2e/failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ var _ = ginkgo.Describe("failover testing", func() {
fmt.Printf("reschedule in %d target cluster\n", totalNum)
})

ginkgo.By("check if the scheduled condition is true", func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is unnecessary to add this judgment to determine the conition of binding.

If the condition Scheduled is not set to true, the resource will not be propagated to member clusters and the previous test will fail. Same as bellow.

How do you think?

Copy link
Member Author

@dddddai dddddai Oct 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@XiShanYongYe-Chang Thanks for your review!
Yes it seems unnecessary now, but it can make sure the scheduled condition is True if any changes are made in future, in which case this test should work.

From this point of view, the test is reasonable, so I'm inclined to keep it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
rb, err := getResourceBinding(deployment)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
return meta.IsStatusConditionTrue(rb.Status.Conditions, workv1alpha2.Scheduled), nil
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})

ginkgo.By("recover not ready cluster", func() {
for _, disabledCluster := range disabledClusters {
fmt.Printf("cluster %s is waiting for recovering\n", disabledCluster.Name)
Expand Down
43 changes: 43 additions & 0 deletions test/e2e/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/onsi/gomega"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -92,6 +93,15 @@ var _ = ginkgo.Describe("propagation with label and group constraints testing",
gomega.Expect(len(targetClusterNames) == minGroups).ShouldNot(gomega.BeFalse())
})

ginkgo.By("check if the scheduled condition is true", func() {
err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
rb, err := getResourceBinding(deployment)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
return meta.IsStatusConditionTrue(rb.Status.Conditions, workv1alpha2.Scheduled), nil
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})

ginkgo.By("check if deployment present on right clusters", func() {
for _, targetClusterName := range targetClusterNames {
clusterClient := getClusterClient(targetClusterName)
Expand Down Expand Up @@ -266,6 +276,15 @@ var _ = ginkgo.Describe("propagation with label and group constraints testing",
fmt.Printf("target clusters in cluster resource binding are %s\n", targetClusterNames)
})

ginkgo.By("check if the scheduled condition is true", func() {
err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
crb, err := getClusterResourceBinding(crd)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
return meta.IsStatusConditionTrue(crb.Status.Conditions, workv1alpha2.Scheduled), nil
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})

ginkgo.By("check if crd present on right clusters", func() {
for _, targetClusterName := range targetClusterNames {
clusterDynamicClient := getClusterDynamicClient(targetClusterName)
Expand Down Expand Up @@ -864,3 +883,27 @@ var _ = ginkgo.Describe("[ReplicaScheduling] ReplicaSchedulingStrategy testing",
})
})
})

// get the resource binding associated with the workload
func getResourceBinding(workload interface{}) (*workv1alpha2.ResourceBinding, error) {
uncastObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(workload)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
obj := unstructured.Unstructured{Object: uncastObj}
bindingName := names.GenerateBindingName(obj.GetKind(), obj.GetName())
binding := &workv1alpha2.ResourceBinding{}

err = controlPlaneClient.Get(context.TODO(), client.ObjectKey{Namespace: obj.GetNamespace(), Name: bindingName}, binding)
return binding, err
}

// get the cluster resource binding associated with the workload
func getClusterResourceBinding(workload interface{}) (*workv1alpha2.ClusterResourceBinding, error) {
uncastObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(workload)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
obj := unstructured.Unstructured{Object: uncastObj}
bindingName := names.GenerateBindingName(obj.GetKind(), obj.GetName())
binding := &workv1alpha2.ClusterResourceBinding{}

err = controlPlaneClient.Get(context.TODO(), client.ObjectKey{Name: bindingName}, binding)
return binding, err
}