diff --git a/pkg/controllers/alibaba_cloud_slb/alibaba_cloud_slb_controller.go b/pkg/controllers/alibaba_cloud_slb/alibaba_cloud_slb_controller.go index 462b8989..f0efe852 100644 --- a/pkg/controllers/alibaba_cloud_slb/alibaba_cloud_slb_controller.go +++ b/pkg/controllers/alibaba_cloud_slb/alibaba_cloud_slb_controller.go @@ -97,10 +97,6 @@ func (r *ReconcileAdapter) DeleteEmployer(employer client.Object, toDelete []res return nil, nil, nil } -func (r *ReconcileAdapter) RecordEmployer(succCreate, succUpdate, succDelete []resourceconsist.IEmployer) error { - return nil -} - func (r *ReconcileAdapter) GetExpectEmployee(ctx context.Context, employer client.Object) ([]resourceconsist.IEmployee, error) { svc, ok := employer.(*corev1.Service) if !ok { diff --git a/pkg/controllers/demoresourceconsist/demoresourceconsist_controller.go b/pkg/controllers/demoresourceconsist/demoresourceconsist_controller.go deleted file mode 100644 index 8d86a3a5..00000000 --- a/pkg/controllers/demoresourceconsist/demoresourceconsist_controller.go +++ /dev/null @@ -1,333 +0,0 @@ -/* -Copyright 2023 The KusionStack Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package demoresourceconsist - -import ( - "context" - "fmt" - "strconv" - "strings" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" - "kusionstack.io/kafed/pkg/controllers/resourceconsist" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/predicate" -) - -type ReconcileAdapter struct { - client.Client -} - -var _ resourceconsist.ReconcileAdapter = &ReconcileAdapter{} - -func NewReconcileAdapter(c client.Client) *ReconcileAdapter { - return &ReconcileAdapter{ - Client: c, - } -} - -func (r *ReconcileAdapter) GetSelectedEmployeeNames(ctx context.Context, employer client.Object) ([]string, error) { - svc, ok := employer.(*corev1.Service) - if !ok { - return nil, fmt.Errorf("expect employer kind is Service") - } - selector := labels.Set(svc.Spec.Selector).AsSelectorPreValidated() - var podList corev1.PodList - err := r.List(ctx, &podList, &client.ListOptions{Namespace: svc.Namespace, LabelSelector: selector}) - if err != nil { - return nil, err - } - - selected := make([]string, len(podList.Items)) - for idx, pod := range podList.Items { - selected[idx] = pod.Name - } - - return selected, nil -} - -func (r *ReconcileAdapter) GetControllerName() string { - return "demo-controller" -} - -func (r *ReconcileAdapter) NewEmployer() client.Object { - return &corev1.Service{} -} - -func (r *ReconcileAdapter) NewEmployee() client.Object { - return &corev1.Pod{} -} - -func (r *ReconcileAdapter) EmployerEventHandler() handler.EventHandler { - return &EnqueueServiceWithRateLimit{} -} - -func (r *ReconcileAdapter) EmployeeEventHandler() handler.EventHandler { - return &EnqueueServiceByPod{ - c: r.Client, - } -} - -func (r *ReconcileAdapter) EmployerPredicates() predicate.Funcs { - return predicate.Funcs{ - CreateFunc: func(event event.CreateEvent) bool { - return doPredicate(event.Object) - }, - DeleteFunc: func(event event.DeleteEvent) bool { - return doPredicate(event.Object) - }, - UpdateFunc: func(event event.UpdateEvent) bool { - return doPredicate(event.ObjectNew) - }, - GenericFunc: func(event event.GenericEvent) bool { - return doPredicate(event.Object) - }, - } -} - -func (r *ReconcileAdapter) EmployeePredicates() predicate.Funcs { - return predicate.Funcs{} -} - -func (r *ReconcileAdapter) NotFollowPodOpsLifeCycle() bool { - return false -} - -func (r *ReconcileAdapter) GetExpectEmployer(ctx context.Context, employer client.Object) ([]resourceconsist.IEmployer, error) { - return nil, nil -} - -func (r *ReconcileAdapter) GetCurrentEmployer(ctx context.Context, employer client.Object) ([]resourceconsist.IEmployer, error) { - return nil, nil -} - -func (r *ReconcileAdapter) CreateEmployer(employer client.Object, toCreate []resourceconsist.IEmployer) ([]resourceconsist.IEmployer, []resourceconsist.IEmployer, error) { - return nil, nil, nil -} - -func (r *ReconcileAdapter) UpdateEmployer(employer client.Object, toUpdate []resourceconsist.IEmployer) ([]resourceconsist.IEmployer, []resourceconsist.IEmployer, error) { - return nil, nil, nil -} - -func (r *ReconcileAdapter) DeleteEmployer(employer client.Object, toDelete []resourceconsist.IEmployer) ([]resourceconsist.IEmployer, []resourceconsist.IEmployer, error) { - return nil, nil, nil -} - -func (r *ReconcileAdapter) RecordEmployer(succCreate, succUpdate, succDelete []resourceconsist.IEmployer) error { - return nil -} - -// GetExpectEmployeeStatus return expect employee status -func (r *ReconcileAdapter) GetExpectEmployee(ctx context.Context, employer client.Object) ([]resourceconsist.IEmployee, error) { - if !employer.GetDeletionTimestamp().IsZero() { - return []resourceconsist.IEmployee{}, nil - } - - svc, ok := employer.(*corev1.Service) - if !ok { - return nil, fmt.Errorf("expect employer kind is Service") - } - selector := labels.Set(svc.Spec.Selector).AsSelectorPreValidated() - - var podList corev1.PodList - err := r.List(ctx, &podList, &client.ListOptions{Namespace: svc.Namespace, LabelSelector: selector}) - if err != nil { - return nil, err - } - - expected := make([]resourceconsist.IEmployee, len(podList.Items)) - expectIdx := 0 - for _, pod := range podList.Items { - if !pod.DeletionTimestamp.IsZero() { - continue - } - status := DemoPodStatus{ - EmployeeId: pod.Name, - EmployeeName: pod.Name, - } - employeeStatuses, err := resourceconsist.GetCommonPodEmployeeStatus(&pod) - if err != nil { - return nil, err - } - extraStatus := PodExtraStatus{} - if employeeStatuses.LifecycleReady { - extraStatus.TrafficOn = true - extraStatus.TrafficWeight = 100 - } else { - extraStatus.TrafficOn = false - extraStatus.TrafficWeight = 0 - } - employeeStatuses.ExtraStatus = extraStatus - status.EmployeeStatuses = employeeStatuses - expected[expectIdx] = status - expectIdx++ - } - - return expected[:expectIdx], nil -} - -func (r *ReconcileAdapter) GetCurrentEmployee(ctx context.Context, employer client.Object) ([]resourceconsist.IEmployee, error) { - svc, ok := employer.(*corev1.Service) - if !ok { - return nil, fmt.Errorf("expect employer kind is Service") - } - - if svc.GetAnnotations()["demo-added-pods"] == "" { - return nil, nil - } - - addedPodNames := strings.Split(svc.GetAnnotations()["demo-added-pods"], ",") - current := make([]resourceconsist.IEmployee, len(addedPodNames)) - currentIdx := 0 - - for _, podName := range addedPodNames { - pod := &corev1.Pod{} - err := r.Get(ctx, types.NamespacedName{ - Namespace: employer.GetNamespace(), - Name: podName, - }, pod) - if err != nil { - if errors.IsNotFound(err) { - continue - } - return nil, err - } - status := DemoPodStatus{ - EmployeeId: podName, - EmployeeName: podName, - } - employeeStatus, err := resourceconsist.GetCommonPodEmployeeStatus(pod) - if err != nil { - return nil, err - } - extraStatus := PodExtraStatus{} - if pod.GetLabels()["demo-traffic-on"] == "true" { - extraStatus.TrafficOn = true - } - if pod.GetLabels()["demo-traffic-weight"] != "" { - extraStatus.TrafficWeight, _ = strconv.Atoi(pod.GetLabels()["demo-traffic-weight"]) - } - employeeStatus.ExtraStatus = extraStatus - status.EmployeeStatuses = employeeStatus - current[currentIdx] = status - currentIdx++ - } - return current[:currentIdx], nil -} - -func (r *ReconcileAdapter) CreateEmployees(employer client.Object, toCreate []resourceconsist.IEmployee) ([]resourceconsist.IEmployee, []resourceconsist.IEmployee, error) { - if employer == nil { - return nil, nil, fmt.Errorf("employer is nil") - } - if len(toCreate) == 0 { - return toCreate, nil, nil - } - toAddNames := make([]string, len(toCreate)) - toAddIdx := 0 - for _, employee := range toCreate { - toAddNames[toAddIdx] = employee.GetEmployeeName() - toAddIdx++ - } - toAddNames = toAddNames[:toAddIdx] - anno := employer.GetAnnotations() - if anno["demo-added-pods"] == "" { - anno["demo-added-pods"] = strings.Join(toAddNames, ",") - } else { - anno["demo-added-pods"] = anno["demo-added-pods"] + "," + strings.Join(toAddNames, ",") - } - employer.SetAnnotations(anno) - err := r.Client.Update(context.Background(), employer) - if err != nil { - return nil, nil, err - } - return toCreate, nil, nil -} - -func (r *ReconcileAdapter) UpdateEmployees(employer client.Object, toUpdate []resourceconsist.IEmployee) ([]resourceconsist.IEmployee, []resourceconsist.IEmployee, error) { - if employer == nil { - return nil, nil, fmt.Errorf("employer is nil") - } - if len(toUpdate) == 0 { - return toUpdate, nil, nil - } - succUpdate := make([]resourceconsist.IEmployee, len(toUpdate)) - failUpdate := make([]resourceconsist.IEmployee, len(toUpdate)) - succUpdateIdx, failUpdateIdx := 0, 0 - for _, employee := range toUpdate { - pod := &corev1.Pod{} - err := r.Get(context.Background(), types.NamespacedName{ - Namespace: employer.GetNamespace(), - Name: employee.GetEmployeeName(), - }, pod) - podEmployeeStatus := employee.GetEmployeeStatuses().(resourceconsist.PodEmployeeStatuses) - if err != nil { - return succUpdate, failUpdate, err - } - extraStatus := podEmployeeStatus.ExtraStatus.(PodExtraStatus) - if extraStatus.TrafficOn { - pod.GetLabels()["demo-traffic-on"] = "true" - } - pod.GetLabels()["demo-traffic-weight"] = strconv.Itoa(extraStatus.TrafficWeight) - err = r.Client.Update(context.Background(), pod) - if err != nil { - failUpdate[failUpdateIdx] = employee - failUpdateIdx++ - continue - } - succUpdate[succUpdateIdx] = employee - succUpdateIdx++ - } - return succUpdate[:succUpdateIdx], failUpdate[:failUpdateIdx], nil -} - -func (r *ReconcileAdapter) DeleteEmployees(employer client.Object, toDelete []resourceconsist.IEmployee) ([]resourceconsist.IEmployee, []resourceconsist.IEmployee, error) { - if employer == nil { - return nil, nil, fmt.Errorf("employer is nil") - } - if len(toDelete) == 0 { - return toDelete, nil, nil - } - - toDeleteMap := make(map[string]bool) - for _, employee := range toDelete { - toDeleteMap[employee.GetEmployeeName()] = true - } - - addedPodNames := strings.Split(employer.GetAnnotations()["demo-added-pods"], ",") - - afterDeleteIdx := 0 - for _, added := range addedPodNames { - if !toDeleteMap[added] { - addedPodNames[afterDeleteIdx] = added - afterDeleteIdx++ - } - } - addedPodNames = addedPodNames[:afterDeleteIdx] - anno := employer.GetAnnotations() - anno["demo-added-pods"] = strings.Join(addedPodNames, ",") - employer.SetAnnotations(anno) - err := r.Client.Update(context.Background(), employer) - if err != nil { - return nil, nil, err - } - return toDelete, nil, nil -} diff --git a/pkg/controllers/demoresourceconsist/event_handler.go b/pkg/controllers/demoresourceconsist/event_handler.go deleted file mode 100644 index c7a90eb7..00000000 --- a/pkg/controllers/demoresourceconsist/event_handler.go +++ /dev/null @@ -1,207 +0,0 @@ -/* -Copyright 2023 The KusionStack Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package demoresourceconsist - -import ( - "context" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/reconcile" -) - -var _ handler.EventHandler = &EnqueueServiceWithRateLimit{} - -var _ handler.EventHandler = &EnqueueServiceByPod{} - -type EnqueueServiceWithRateLimit struct{} - -type EnqueueServiceByPod struct { - c client.Client -} - -func (e *EnqueueServiceWithRateLimit) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { - if evt.Object == nil { - return - } - - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }}) -} - -func (e *EnqueueServiceWithRateLimit) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { - if evt.ObjectOld != nil { - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.ObjectOld.GetName(), - Namespace: evt.ObjectOld.GetNamespace(), - }}) - } - - if evt.ObjectNew != nil { - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.ObjectNew.GetName(), - Namespace: evt.ObjectNew.GetNamespace(), - }}) - } -} - -func (e *EnqueueServiceWithRateLimit) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { - if evt.Object == nil { - return - } - - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }}) -} - -func (e *EnqueueServiceWithRateLimit) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { - if evt.Object == nil { - return - } - - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }}) -} - -func (e *EnqueueServiceByPod) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { - serviceNames, err := GetEmployerByEmployee(context.Background(), e.c, evt.Object) - if err != nil { - return - } - for _, obj := range serviceNames { - svc := &corev1.Service{} - err = e.c.Get(context.Background(), types.NamespacedName{Name: obj, Namespace: evt.Object.GetNamespace()}, svc) - if err != nil { - continue - } - if doPredicate(svc) { - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Namespace: evt.Object.GetNamespace(), Name: obj}}) - } - } -} - -func (e *EnqueueServiceByPod) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { - serviceNamesOld, err := GetEmployerByEmployee(context.Background(), e.c, evt.ObjectOld) - if err != nil { - return - } - for _, obj := range serviceNamesOld { - svc := &corev1.Service{} - err = e.c.Get(context.Background(), types.NamespacedName{Name: obj, Namespace: evt.ObjectOld.GetNamespace()}, svc) - if err != nil { - continue - } - if doPredicate(svc) { - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Namespace: evt.ObjectOld.GetNamespace(), Name: obj}}) - } - } - - serviceNamesNew, err := GetEmployerByEmployee(context.Background(), e.c, evt.ObjectNew) - if err != nil { - return - } - for _, obj := range serviceNamesNew { - svc := &corev1.Service{} - err = e.c.Get(context.Background(), types.NamespacedName{Name: obj, Namespace: evt.ObjectNew.GetNamespace()}, svc) - if err != nil { - continue - } - if doPredicate(svc) { - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Namespace: evt.ObjectNew.GetNamespace(), Name: obj}}) - } - } -} - -func (e *EnqueueServiceByPod) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { - serviceNames, err := GetEmployerByEmployee(context.Background(), e.c, evt.Object) - if err != nil { - return - } - for _, obj := range serviceNames { - svc := &corev1.Service{} - err = e.c.Get(context.Background(), types.NamespacedName{Name: obj, Namespace: evt.Object.GetNamespace()}, svc) - if err != nil { - continue - } - if doPredicate(svc) { - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Namespace: evt.Object.GetNamespace(), Name: obj}}) - } - } -} - -func (e *EnqueueServiceByPod) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { - serviceNames, err := GetEmployerByEmployee(context.Background(), e.c, evt.Object) - if err != nil { - return - } - for _, obj := range serviceNames { - svc := &corev1.Service{} - err = e.c.Get(context.Background(), types.NamespacedName{Name: obj, Namespace: evt.Object.GetNamespace()}, svc) - if err != nil { - continue - } - if doPredicate(svc) { - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Namespace: evt.Object.GetNamespace(), Name: obj}}) - } - } -} - -func GetEmployerByEmployee(ctx context.Context, client client.Client, employee client.Object) ([]string, error) { - if employee.GetLabels() == nil { - return nil, nil - } - - services := &corev1.ServiceList{} - if err := client.List(ctx, services); err != nil { - return nil, err - } - - employeeLabels := labels.Set(employee.GetLabels()) - var names []string - for _, svc := range services.Items { - selector := labels.Set(svc.Spec.Selector).AsSelector() - if selector.Matches(employeeLabels) { - names = append(names, svc.GetName()) - } - } - - return names, nil -} - -func doPredicate(obj client.Object) bool { - if obj == nil { - return false - } - if obj.GetAnnotations() != nil { - value, exist := obj.GetAnnotations()["demo-controller-service"] - if exist && value == "true" { - return true - } - } - return false -} diff --git a/pkg/controllers/demoresourceconsist/types.go b/pkg/controllers/demoresourceconsist/types.go deleted file mode 100644 index 2580182f..00000000 --- a/pkg/controllers/demoresourceconsist/types.go +++ /dev/null @@ -1,85 +0,0 @@ -/* -Copyright 2023 The KusionStack Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package demoresourceconsist - -import ( - "fmt" - "reflect" - - "kusionstack.io/kafed/pkg/controllers/resourceconsist" -) - -var _ resourceconsist.IEmployer = DemoServiceStatus{} -var _ resourceconsist.IEmployee = DemoPodStatus{} - -type DemoServiceStatus struct { - EmployerId string - EmployerStatuses interface{} -} - -func (d DemoServiceStatus) GetEmployerId() string { - return d.EmployerId -} - -func (d DemoServiceStatus) GetEmployerStatuses() interface{} { - return d.EmployerStatuses -} - -func (d DemoServiceStatus) EmployerEqual(employerStatuses interface{}) (bool, error) { - return true, nil -} - -type DemoPodStatus struct { - EmployeeId string - EmployeeName string - EmployeeStatuses resourceconsist.PodEmployeeStatuses -} - -func (d DemoPodStatus) GetEmployeeId() string { - return d.EmployeeId -} - -func (d DemoPodStatus) GetEmployeeName() string { - return d.EmployeeName -} - -func (d DemoPodStatus) GetEmployeeStatuses() interface{} { - return d.EmployeeStatuses -} - -func (d DemoPodStatus) EmployeeEqual(employeeStatus resourceconsist.IEmployee) (bool, error) { - if d.EmployeeName != employeeStatus.GetEmployeeName() { - return false, nil - } - - podEmployeeStatuses, ok := employeeStatus.GetEmployeeStatuses().(resourceconsist.PodEmployeeStatuses) - if !ok { - return false, fmt.Errorf("employee to diff is not Pod Employee status") - } - - if d.EmployeeStatuses.Ip != podEmployeeStatuses.Ip || d.EmployeeStatuses.LifecycleReady != podEmployeeStatuses.LifecycleReady || - d.EmployeeStatuses.Ipv6 != podEmployeeStatuses.Ipv6 { - return false, nil - } - - return reflect.DeepEqual(d.EmployeeStatuses.ExtraStatus, podEmployeeStatuses.ExtraStatus), nil -} - -type PodExtraStatus struct { - TrafficOn bool - TrafficWeight int -} diff --git a/pkg/controllers/resourceconsist/consister.go b/pkg/controllers/resourceconsist/consister.go index d3ace55d..d2770f49 100644 --- a/pkg/controllers/resourceconsist/consister.go +++ b/pkg/controllers/resourceconsist/consister.go @@ -38,24 +38,19 @@ func (r *Consist) syncEmployer(ctx context.Context, employer client.Object, expe if err != nil { return false, false, fmt.Errorf("diff employer failed, err: %s", err.Error()) } - succCreate, failCreate, err := r.adapter.CreateEmployer(employer, toCudEmployer.ToCreate) + _, failCreate, err := r.adapter.CreateEmployer(employer, toCudEmployer.ToCreate) if err != nil { return false, false, fmt.Errorf("syncCreate failed, err: %s", err.Error()) } - succUpdate, failUpdate, err := r.adapter.UpdateEmployer(employer, toCudEmployer.ToUpdate) + _, failUpdate, err := r.adapter.UpdateEmployer(employer, toCudEmployer.ToUpdate) if err != nil { return false, false, fmt.Errorf("syncUpdate failed, err: %s", err.Error()) } - succDelete, failDelete, err := r.adapter.DeleteEmployer(employer, toCudEmployer.ToDelete) + _, failDelete, err := r.adapter.DeleteEmployer(employer, toCudEmployer.ToDelete) if err != nil { return false, false, fmt.Errorf("syncDelete failed, err: %s", err.Error()) } - err = r.adapter.RecordEmployer(succCreate, succUpdate, succDelete) - if err != nil { - return false, false, fmt.Errorf("record employer failed, err: %s", err.Error()) - } - isClean := len(toCudEmployer.Unchanged) == 0 && len(toCudEmployer.ToCreate) == 0 && len(toCudEmployer.ToUpdate) == 0 && len(toCudEmployer.ToDelete) == 0 cudFailedExist := len(failCreate) > 0 || len(failUpdate) > 0 || len(failDelete) > 0 return isClean, cudFailedExist, nil @@ -85,7 +80,7 @@ func (r *Consist) diffEmployer(expectEmployer, currentEmployer []IEmployer) (ToC toCreateIdx++ continue } - equal, err := expect.EmployerEqual(current.GetEmployerStatuses()) + equal, err := expect.EmployerEqual(current) if err != nil { return ToCUDEmployer{}, err } @@ -190,10 +185,6 @@ func (r *Consist) syncEmployees(ctx context.Context, employer client.Object, exp return false, false, err } - // todo, to be removed, for demo - r.Recorder.Eventf(employer, corev1.EventTypeNormal, "diffEmployees", "toCreate: %v, toUpdate: %v, toDelete: %v, unchanged: %v", - toCudEmployees.ToCreate, toCudEmployees.ToUpdate, toCudEmployees.ToDelete, toCudEmployees.Unchanged) - succCreate, failCreate, err := r.adapter.CreateEmployees(employer, toCudEmployees.ToCreate) if err != nil { return false, false, fmt.Errorf("syncCreate failed, err: %s", err.Error()) @@ -255,6 +246,12 @@ func (r *Consist) ensureExpectedFinalizer(ctx context.Context, employer client.O } patch := client.MergeFrom(employer.DeepCopyObject().(client.Object)) annos := employer.GetAnnotations() + if annos == nil { + annos = make(map[string]string) + } + if annos[expectedFinalizerAddedAnnoKey] == strings.Join(notDeletedPodNames, ",") { + return len(notDeletedPodNames) == 0, nil + } annos[expectedFinalizerAddedAnnoKey] = strings.Join(notDeletedPodNames, ",") employer.SetAnnotations(annos) return len(notDeletedPodNames) == 0, r.Client.Patch(ctx, employer, patch) @@ -302,6 +299,12 @@ func (r *Consist) ensureExpectedFinalizer(ctx context.Context, employer client.O patch := client.MergeFrom(employer.DeepCopyObject().(client.Object)) annos := employer.GetAnnotations() + if annos == nil { + annos = make(map[string]string) + } + if annos[expectedFinalizerAddedAnnoKey] == strings.Join(addedNames, ",") { + return len(addedNames) == 0, nil + } annos[expectedFinalizerAddedAnnoKey] = strings.Join(addedNames, ",") employer.SetAnnotations(annos) return len(addedNames) == 0, r.Client.Patch(ctx, employer, patch) diff --git a/pkg/controllers/resourceconsist/resourceconsist_controller.go b/pkg/controllers/resourceconsist/resourceconsist_controller.go index e49c27fa..6b0f1c58 100644 --- a/pkg/controllers/resourceconsist/resourceconsist_controller.go +++ b/pkg/controllers/resourceconsist/resourceconsist_controller.go @@ -117,6 +117,8 @@ type Consist struct { } // +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;update;patch func (r *Consist) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { var employer client.Object diff --git a/pkg/controllers/resourceconsist/resourceconsist_controller_suite_test.go b/pkg/controllers/resourceconsist/resourceconsist_controller_suite_test.go new file mode 100644 index 00000000..b9c48f5b --- /dev/null +++ b/pkg/controllers/resourceconsist/resourceconsist_controller_suite_test.go @@ -0,0 +1,522 @@ +/* +Copyright 2023 The KusionStack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourceconsist + +import ( + "context" + "encoding/json" + "fmt" + "reflect" + "strconv" + "strings" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type DemoReconcileAdapter struct { + client.Client +} + +var _ ReconcileAdapter = &DemoReconcileAdapter{} + +func NewDemoReconcileAdapter(c client.Client) *DemoReconcileAdapter { + return &DemoReconcileAdapter{ + Client: c, + } +} + +func (r *DemoReconcileAdapter) GetSelectedEmployeeNames(ctx context.Context, employer client.Object) ([]string, error) { + svc, ok := employer.(*corev1.Service) + if !ok { + return nil, fmt.Errorf("expect employer kind is Service") + } + selector := labels.Set(svc.Spec.Selector).AsSelectorPreValidated() + var podList corev1.PodList + err := r.List(ctx, &podList, &client.ListOptions{Namespace: svc.Namespace, LabelSelector: selector}) + if err != nil { + return nil, err + } + + selected := make([]string, len(podList.Items)) + for idx, pod := range podList.Items { + selected[idx] = pod.Name + } + + return selected, nil +} + +func (r *DemoReconcileAdapter) GetControllerName() string { + return "demo-controller" +} + +func (r *DemoReconcileAdapter) NotFollowPodOpsLifeCycle() bool { + return false +} + +func (r *DemoReconcileAdapter) GetExpectEmployer(ctx context.Context, employer client.Object) ([]IEmployer, error) { + if !employer.GetDeletionTimestamp().IsZero() { + return nil, nil + } + var expect []IEmployer + expect = append(expect, DemoServiceStatus{ + EmployerId: "demo-expect-employer-id", + EmployerStatuses: DemoServiceDetails{ + RemoteVIP: "demo-remote-VIP", + RemoteVIPQPS: 100, + }, + }) + return expect, nil +} + +func (r *DemoReconcileAdapter) GetCurrentEmployer(ctx context.Context, employer client.Object) ([]IEmployer, error) { + var current []IEmployer + if employer.GetAnnotations()["demo-current-employer"] == "" { + return current, nil + } + var currentDemoServiceStatus []DemoServiceStatus + err := json.Unmarshal([]byte(employer.GetAnnotations()["demo-current-employer"]), ¤tDemoServiceStatus) + if err != nil { + return current, err + } + for _, employerStatus := range currentDemoServiceStatus { + current = append(current, employerStatus) + } + return current, nil +} + +func (r *DemoReconcileAdapter) CreateEmployer(employer client.Object, toCreate []IEmployer) ([]IEmployer, []IEmployer, error) { + var currentDemoServiceStatus []DemoServiceStatus + if employer.GetAnnotations()["demo-current-employer"] != "" { + err := json.Unmarshal([]byte(employer.GetAnnotations()["demo-current-employer"]), ¤tDemoServiceStatus) + if err != nil { + return nil, toCreate, err + } + } + for _, create := range toCreate { + createDemoServiceStatus, ok := create.(DemoServiceStatus) + if !ok { + return nil, toCreate, fmt.Errorf("toCreate employer is not DemoServiceStatus") + } + currentDemoServiceStatus = append(currentDemoServiceStatus, createDemoServiceStatus) + } + + patch := client.MergeFrom(employer.DeepCopyObject().(client.Object)) + annos := employer.GetAnnotations() + if annos == nil { + annos = make(map[string]string) + } + annos["demo-current-employer"] = "" + if currentDemoServiceStatus != nil { + b, err := json.Marshal(currentDemoServiceStatus) + if err != nil { + return nil, toCreate, err + } + annos["demo-current-employer"] = string(b) + } + employer.SetAnnotations(annos) + err := r.Patch(context.Background(), employer, patch) + if err != nil { + return nil, toCreate, err + } + + return toCreate, nil, nil +} + +func (r *DemoReconcileAdapter) UpdateEmployer(employer client.Object, toUpdate []IEmployer) ([]IEmployer, []IEmployer, error) { + var currentDemoServiceStatus []DemoServiceStatus + if employer.GetAnnotations()["demo-current-employer"] != "" { + err := json.Unmarshal([]byte(employer.GetAnnotations()["demo-current-employer"]), ¤tDemoServiceStatus) + if err != nil { + return nil, toUpdate, err + } + } + + toUpdateEmployerMap := make(map[string]IEmployer) + updated := make(map[string]bool) + for _, update := range toUpdate { + toUpdateEmployerMap[update.GetEmployerId()] = update + } + + for idx, cur := range currentDemoServiceStatus { + if toUpdateEmployerStatus, ok := toUpdateEmployerMap[cur.EmployerId]; ok { + currentDemoServiceStatus[idx] = toUpdateEmployerStatus.(DemoServiceStatus) + updated[cur.EmployerId] = true + } + } + + patch := client.MergeFrom(employer.DeepCopyObject().(client.Object)) + annos := employer.GetAnnotations() + annos["demo-current-employer"] = "" + if currentDemoServiceStatus != nil { + b, err := json.Marshal(currentDemoServiceStatus) + if err != nil { + return nil, toUpdate, err + } + annos["demo-current-employer"] = string(b) + } + employer.SetAnnotations(annos) + err := r.Patch(context.Background(), employer, patch) + if err != nil { + return nil, toUpdate, err + } + + var succ []IEmployer + var fail []IEmployer + for employerId, updatedSucc := range updated { + if updatedSucc { + succ = append(succ, toUpdateEmployerMap[employerId]) + } else { + fail = append(fail, toUpdateEmployerMap[employerId]) + } + } + + return succ, fail, nil +} + +func (r *DemoReconcileAdapter) DeleteEmployer(employer client.Object, toDelete []IEmployer) ([]IEmployer, []IEmployer, error) { + var currentDemoServiceStatus []DemoServiceStatus + if employer.GetAnnotations()["demo-current-employer"] == "" { + return toDelete, nil, nil + } + + err := json.Unmarshal([]byte(employer.GetAnnotations()["demo-current-employer"]), ¤tDemoServiceStatus) + if err != nil { + return nil, toDelete, err + } + + toDeleteEmployerMap := make(map[string]IEmployer) + deleted := make(map[string]bool) + for _, del := range toDelete { + toDeleteEmployerMap[del.GetEmployerId()] = del + } + + var afterDeletedDemoServiceStatus []DemoServiceStatus + for _, cur := range currentDemoServiceStatus { + if _, ok := toDeleteEmployerMap[cur.EmployerId]; ok { + deleted[cur.EmployerId] = true + continue + } + afterDeletedDemoServiceStatus = append(afterDeletedDemoServiceStatus, cur) + } + + patch := client.MergeFrom(employer.DeepCopyObject().(client.Object)) + annos := employer.GetAnnotations() + if annos == nil { + return toDelete, nil, nil + } + annos["demo-current-employer"] = "" + if afterDeletedDemoServiceStatus != nil { + b, err := json.Marshal(afterDeletedDemoServiceStatus) + if err != nil { + return nil, toDelete, err + } + annos["demo-current-employer"] = string(b) + } + + employer.SetAnnotations(annos) + err = r.Patch(context.Background(), employer, patch) + if err != nil { + return nil, toDelete, err + } + + var succ []IEmployer + var fail []IEmployer + for employerId, delSucc := range deleted { + if delSucc { + succ = append(succ, toDeleteEmployerMap[employerId]) + } else { + fail = append(fail, toDeleteEmployerMap[employerId]) + } + } + + return succ, fail, nil +} + +// GetExpectEmployeeStatus return expect employee status +func (r *DemoReconcileAdapter) GetExpectEmployee(ctx context.Context, employer client.Object) ([]IEmployee, error) { + if !employer.GetDeletionTimestamp().IsZero() { + return []IEmployee{}, nil + } + + svc, ok := employer.(*corev1.Service) + if !ok { + return nil, fmt.Errorf("expect employer kind is Service") + } + selector := labels.Set(svc.Spec.Selector).AsSelectorPreValidated() + + var podList corev1.PodList + err := r.List(ctx, &podList, &client.ListOptions{Namespace: svc.Namespace, LabelSelector: selector}) + if err != nil { + return nil, err + } + + expected := make([]IEmployee, len(podList.Items)) + expectIdx := 0 + for _, pod := range podList.Items { + if !pod.DeletionTimestamp.IsZero() { + continue + } + status := DemoPodStatus{ + EmployeeId: pod.Name, + EmployeeName: pod.Name, + } + employeeStatuses, err := GetCommonPodEmployeeStatus(&pod) + if err != nil { + return nil, err + } + extraStatus := PodExtraStatus{} + if employeeStatuses.LifecycleReady { + extraStatus.TrafficOn = true + extraStatus.TrafficWeight = 100 + } else { + extraStatus.TrafficOn = false + extraStatus.TrafficWeight = 0 + } + employeeStatuses.ExtraStatus = extraStatus + status.EmployeeStatuses = employeeStatuses + expected[expectIdx] = status + expectIdx++ + } + + return expected[:expectIdx], nil +} + +func (r *DemoReconcileAdapter) GetCurrentEmployee(ctx context.Context, employer client.Object) ([]IEmployee, error) { + svc, ok := employer.(*corev1.Service) + if !ok { + return nil, fmt.Errorf("expect employer kind is Service") + } + + if svc.GetAnnotations()["demo-added-pods"] == "" { + return nil, nil + } + + addedPodNames := strings.Split(svc.GetAnnotations()["demo-added-pods"], ",") + current := make([]IEmployee, len(addedPodNames)) + currentIdx := 0 + + for _, podName := range addedPodNames { + pod := &corev1.Pod{} + err := r.Get(ctx, types.NamespacedName{ + Namespace: employer.GetNamespace(), + Name: podName, + }, pod) + if err != nil { + if errors.IsNotFound(err) { + continue + } + return nil, err + } + status := DemoPodStatus{ + EmployeeId: podName, + EmployeeName: podName, + } + employeeStatus, err := GetCommonPodEmployeeStatus(pod) + if err != nil { + return nil, err + } + extraStatus := PodExtraStatus{} + if pod.GetLabels()["demo-traffic-on"] == "true" { + extraStatus.TrafficOn = true + } + if pod.GetLabels()["demo-traffic-weight"] != "" { + extraStatus.TrafficWeight, _ = strconv.Atoi(pod.GetLabels()["demo-traffic-weight"]) + } + employeeStatus.ExtraStatus = extraStatus + status.EmployeeStatuses = employeeStatus + current[currentIdx] = status + currentIdx++ + } + return current[:currentIdx], nil +} + +func (r *DemoReconcileAdapter) CreateEmployees(employer client.Object, toCreate []IEmployee) ([]IEmployee, []IEmployee, error) { + if employer == nil { + return nil, nil, fmt.Errorf("employer is nil") + } + if len(toCreate) == 0 { + return toCreate, nil, nil + } + toAddNames := make([]string, len(toCreate)) + toAddIdx := 0 + for _, employee := range toCreate { + toAddNames[toAddIdx] = employee.GetEmployeeName() + toAddIdx++ + } + toAddNames = toAddNames[:toAddIdx] + anno := employer.GetAnnotations() + if anno["demo-added-pods"] == "" { + anno["demo-added-pods"] = strings.Join(toAddNames, ",") + } else { + anno["demo-added-pods"] = anno["demo-added-pods"] + "," + strings.Join(toAddNames, ",") + } + employer.SetAnnotations(anno) + err := r.Client.Update(context.Background(), employer) + if err != nil { + return nil, nil, err + } + return toCreate, nil, nil +} + +func (r *DemoReconcileAdapter) UpdateEmployees(employer client.Object, toUpdate []IEmployee) ([]IEmployee, []IEmployee, error) { + if employer == nil { + return nil, nil, fmt.Errorf("employer is nil") + } + if len(toUpdate) == 0 { + return toUpdate, nil, nil + } + succUpdate := make([]IEmployee, len(toUpdate)) + failUpdate := make([]IEmployee, len(toUpdate)) + succUpdateIdx, failUpdateIdx := 0, 0 + for _, employee := range toUpdate { + pod := &corev1.Pod{} + err := r.Get(context.Background(), types.NamespacedName{ + Namespace: employer.GetNamespace(), + Name: employee.GetEmployeeName(), + }, pod) + podEmployeeStatus := employee.GetEmployeeStatuses().(PodEmployeeStatuses) + if err != nil { + return succUpdate, failUpdate, err + } + extraStatus := podEmployeeStatus.ExtraStatus.(PodExtraStatus) + if extraStatus.TrafficOn { + pod.GetLabels()["demo-traffic-on"] = "true" + } else { + pod.GetLabels()["demo-traffic-on"] = "false" + } + pod.GetLabels()["demo-traffic-weight"] = strconv.Itoa(extraStatus.TrafficWeight) + err = r.Client.Update(context.Background(), pod) + if err != nil { + failUpdate[failUpdateIdx] = employee + failUpdateIdx++ + continue + } + succUpdate[succUpdateIdx] = employee + succUpdateIdx++ + } + return succUpdate[:succUpdateIdx], failUpdate[:failUpdateIdx], nil +} + +func (r *DemoReconcileAdapter) DeleteEmployees(employer client.Object, toDelete []IEmployee) ([]IEmployee, []IEmployee, error) { + if employer == nil { + return nil, nil, fmt.Errorf("employer is nil") + } + if len(toDelete) == 0 { + return toDelete, nil, nil + } + + toDeleteMap := make(map[string]bool) + for _, employee := range toDelete { + toDeleteMap[employee.GetEmployeeName()] = true + } + + addedPodNames := strings.Split(employer.GetAnnotations()["demo-added-pods"], ",") + + afterDeleteIdx := 0 + for _, added := range addedPodNames { + if !toDeleteMap[added] { + addedPodNames[afterDeleteIdx] = added + afterDeleteIdx++ + } + } + addedPodNames = addedPodNames[:afterDeleteIdx] + anno := employer.GetAnnotations() + anno["demo-added-pods"] = strings.Join(addedPodNames, ",") + employer.SetAnnotations(anno) + err := r.Client.Update(context.Background(), employer) + if err != nil { + return nil, nil, err + } + return toDelete, nil, nil +} + +var _ IEmployer = DemoServiceStatus{} +var _ IEmployee = DemoPodStatus{} + +type DemoServiceStatus struct { + EmployerId string + EmployerStatuses DemoServiceDetails +} + +type DemoServiceDetails struct { + RemoteVIP string + RemoteVIPQPS int +} + +func (d DemoServiceStatus) GetEmployerId() string { + return d.EmployerId +} + +func (d DemoServiceStatus) GetEmployerStatuses() interface{} { + return d.EmployerStatuses +} + +func (d DemoServiceStatus) EmployerEqual(employer IEmployer) (bool, error) { + if d.EmployerId != employer.GetEmployerId() { + return false, nil + } + employerStatus, ok := employer.GetEmployerStatuses().(DemoServiceDetails) + if !ok { + return false, fmt.Errorf("employer to diff is not Demo Service Details") + } + return reflect.DeepEqual(d.EmployerStatuses, employerStatus), nil +} + +type DemoPodStatus struct { + EmployeeId string + EmployeeName string + EmployeeStatuses PodEmployeeStatuses +} + +func (d DemoPodStatus) GetEmployeeId() string { + return d.EmployeeId +} + +func (d DemoPodStatus) GetEmployeeName() string { + return d.EmployeeName +} + +func (d DemoPodStatus) GetEmployeeStatuses() interface{} { + return d.EmployeeStatuses +} + +func (d DemoPodStatus) EmployeeEqual(employeeStatus IEmployee) (bool, error) { + if d.EmployeeName != employeeStatus.GetEmployeeName() { + return false, nil + } + + podEmployeeStatuses, ok := employeeStatus.GetEmployeeStatuses().(PodEmployeeStatuses) + if !ok { + return false, fmt.Errorf("employee to diff is not Pod Employee status") + } + + if d.EmployeeStatuses.Ip != podEmployeeStatuses.Ip || d.EmployeeStatuses.LifecycleReady != podEmployeeStatuses.LifecycleReady || + d.EmployeeStatuses.Ipv6 != podEmployeeStatuses.Ipv6 { + return false, nil + } + + return reflect.DeepEqual(d.EmployeeStatuses.ExtraStatus, podEmployeeStatuses.ExtraStatus), nil +} + +type PodExtraStatus struct { + TrafficOn bool + TrafficWeight int +} diff --git a/pkg/controllers/resourceconsist/resourceconsist_controller_test.go b/pkg/controllers/resourceconsist/resourceconsist_controller_test.go new file mode 100644 index 00000000..8fccc2f7 --- /dev/null +++ b/pkg/controllers/resourceconsist/resourceconsist_controller_test.go @@ -0,0 +1,484 @@ +/* +Copyright 2023 The KusionStack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourceconsist + +import ( + "context" + "os" + "path/filepath" + "strings" + "testing" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" + + "kusionstack.io/kafed/apis" + "kusionstack.io/kafed/apis/apps/v1alpha1" + "kusionstack.io/kafed/pkg/utils/inject" +) + +var ( + env *envtest.Environment + mgr manager.Manager + + ctx context.Context + cancel context.CancelFunc +) + +var _ = Describe("resource-consist-controller", func() { + service := corev1.Service{ + ObjectMeta: v1.ObjectMeta{ + Name: "resource-consist-ut-svc", + Namespace: "default", + Labels: map[string]string{ + v1alpha1.ControlledByKusionStackLabelKey: "true", + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "tcp-80", + Port: 80, + Protocol: corev1.ProtocolTCP, + }, + }, + Selector: map[string]string{ + "resource-consist-ut": "resource-consist-ut", + }, + }, + } + + Context("clean finalizer added", func() { + It("clean finalizer added if service not deleting", func() { + err := mgr.GetClient().Create(context.Background(), &service) + Expect(err).NotTo(HaveOccurred()) + Eventually(func() bool { + service1 := corev1.Service{} + Expect(mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: service.Name, + Namespace: service.Namespace, + }, &service1)).Should(BeNil()) + for _, flz := range service1.GetFinalizers() { + if flz == cleanFinalizerPrefix+service.GetName() { + return true + } + } + return false + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + }) + }) + + Context("employer synced", func() { + It("employer created", func() { + Eventually(func() bool { + service1 := corev1.Service{} + Expect(mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: service.Name, + Namespace: service.Namespace, + }, &service1)).Should(BeNil()) + return service1.GetAnnotations()["demo-current-employer"] == "[{\"EmployerId\":\"demo-expect-employer-id\",\"EmployerStatuses\":{\"RemoteVIP\":\"demo-remote-VIP\",\"RemoteVIPQPS\":100}}]" + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + }) + + It("employer updated", func() { + service1 := corev1.Service{} + Expect(mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: service.Name, + Namespace: service.Namespace, + }, &service1)).Should(BeNil()) + patch := client.MergeFrom(service1.DeepCopy()) + annos := service1.GetAnnotations() + if annos == nil { + annos = make(map[string]string) + } + annos["demo-current-employer"] = "[{\"EmployerId\":\"demo-expect-employer-id\",\"EmployerStatuses\":{\"RemoteVIP\":\"demo-remote-VIP\",\"RemoteVIPQPS\":200}}]" + service1.SetAnnotations(annos) + Expect(mgr.GetClient().Patch(context.Background(), &service1, patch)).Should(BeNil()) + Eventually(func() bool { + service1 := corev1.Service{} + Expect(mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: service.Name, + Namespace: service.Namespace, + }, &service1)).Should(BeNil()) + return service1.GetAnnotations()["demo-current-employer"] == "[{\"EmployerId\":\"demo-expect-employer-id\",\"EmployerStatuses\":{\"RemoteVIP\":\"demo-remote-VIP\",\"RemoteVIPQPS\":100}}]" + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + }) + + It("employer deleted", func() { + Eventually(func() bool { + service1 := corev1.Service{} + Expect(mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: service.Name, + Namespace: service.Namespace, + }, &service1)).Should(BeNil()) + flzs := service1.GetFinalizers() + flzs = append(flzs, "kusionstack.io/ut-block-finalizer") + service1.SetFinalizers(flzs) + return mgr.GetClient().Update(context.TODO(), &service1) == nil + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + + Expect(mgr.GetClient().Delete(context.TODO(), &service)).Should(BeNil()) + Eventually(func() bool { + service1 := corev1.Service{} + Expect(mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: service.Name, + Namespace: service.Namespace, + }, &service1)).Should(BeNil()) + return !service1.GetDeletionTimestamp().IsZero() + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + Eventually(func() bool { + service1 := corev1.Service{} + Expect(mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: service.Name, + Namespace: service.Namespace, + }, &service1)).Should(BeNil()) + return service1.GetAnnotations()["demo-current-employer"] == "" + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + + Eventually(func() bool { + service1 := corev1.Service{} + Expect(mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: service.Name, + Namespace: service.Namespace, + }, &service1)).Should(BeNil()) + var flzs []string + for _, flz := range service1.GetFinalizers() { + if flz == "kusionstack.io/ut-block-finalizer" { + continue + } + flzs = append(flzs, flz) + } + service1.SetFinalizers(flzs) + return mgr.GetClient().Update(context.TODO(), &service1) == nil + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + + Eventually(func() bool { + service1 := corev1.Service{} + err := mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: service.Name, + Namespace: service.Namespace, + }, &service1) + return errors.IsNotFound(err) + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + }) + }) + + Context("employee synced", func() { + svc := corev1.Service{ + ObjectMeta: v1.ObjectMeta{ + Name: "resource-consist-ut-svc-1", + Namespace: "default", + Labels: map[string]string{ + v1alpha1.ControlledByKusionStackLabelKey: "true", + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "tcp-80", + Port: 80, + Protocol: corev1.ProtocolTCP, + }, + }, + Selector: map[string]string{ + "resource-consist-ut": "resource-consist-ut-1", + }, + }, + } + + pod := corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Name: "resource-consist-ut-pod", + Namespace: "default", + Labels: map[string]string{ + v1alpha1.ControlledByKusionStackLabelKey: "true", + "resource-consist-ut": "resource-consist-ut-1", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx:latest", + }, + }, + ReadinessGates: []corev1.PodReadinessGate{ + { + ConditionType: v1alpha1.ReadinessGatePodServiceReady, + }, + }, + }, + } + + It("employee synced, employer created", func() { + Expect(mgr.GetClient().Create(context.Background(), &svc)).Should(BeNil()) + Eventually(func() bool { + service1 := corev1.Service{} + err := mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: svc.Name, + Namespace: svc.Namespace, + }, &service1) + if err != nil { + return false + } + return service1.GetAnnotations()["demo-current-employer"] == "[{\"EmployerId\":\"demo-expect-employer-id\",\"EmployerStatuses\":{\"RemoteVIP\":\"demo-remote-VIP\",\"RemoteVIPQPS\":100}}]" + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + }) + + It("employee synced, employees created", func() { + Expect(mgr.GetClient().Create(context.TODO(), &pod)).Should(BeNil()) + Eventually(func() bool { + pod1 := corev1.Pod{} + err := mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: pod.Name, + Namespace: pod.Namespace, + }, &pod1) + if err != nil { + return false + } + pod1.Status = corev1.PodStatus{ + PodIP: "1.2.3.4", + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + { + Type: v1alpha1.ReadinessGatePodServiceReady, + Status: corev1.ConditionTrue, + }, + }, + } + return mgr.GetClient().Status().Update(context.TODO(), &pod1) == nil + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + + Eventually(func() bool { + service1 := corev1.Service{} + Expect(mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: svc.Name, + Namespace: svc.Namespace, + }, &service1)).Should(BeNil()) + return service1.GetAnnotations()["demo-added-pods"] == pod.Name + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + + Eventually(func() bool { + pod1 := corev1.Pod{} + _ = mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: pod.Name, + Namespace: pod.Namespace, + }, &pod1) + containsLifecycleFlz := false + for _, flz := range pod1.GetFinalizers() { + if flz == GenerateLifecycleFinalizer(svc.Name) { + containsLifecycleFlz = true + break + } + } + return containsLifecycleFlz && pod1.GetLabels()["demo-traffic-on"] == "true" && + pod1.GetLabels()["demo-traffic-weight"] == "100" + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + + Eventually(func() bool { + service1 := corev1.Service{} + err := mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: svc.Name, + Namespace: svc.Namespace, + }, &service1) + if err != nil { + return false + } + return service1.GetAnnotations()[expectedFinalizerAddedAnnoKey] == pod.Name + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + }) + + It("employee synced, employees updated", func() { + Eventually(func() bool { + pod1 := corev1.Pod{} + err := mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: pod.Name, + Namespace: pod.Namespace, + }, &pod1) + if err != nil { + return false + } + pod1.Status.Conditions = []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + { + Type: v1alpha1.ReadinessGatePodServiceReady, + Status: corev1.ConditionFalse, + }, + } + return mgr.GetClient().Status().Update(context.TODO(), &pod1) == nil + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + + Eventually(func() bool { + pod1 := corev1.Pod{} + err := mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: pod.Name, + Namespace: pod.Namespace, + }, &pod1) + if err != nil { + return false + } + containsLifecycleFlz := false + for _, flz := range pod1.GetFinalizers() { + if flz == GenerateLifecycleFinalizer(svc.Name) { + containsLifecycleFlz = true + break + } + } + return !containsLifecycleFlz && pod1.GetLabels()["demo-traffic-on"] == "false" && + pod1.GetLabels()["demo-traffic-weight"] == "0" + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + + Eventually(func() bool { + service1 := corev1.Service{} + Expect(mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: svc.Name, + Namespace: svc.Namespace, + }, &service1)).Should(BeNil()) + return service1.GetAnnotations()["demo-added-pods"] == pod.Name + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + + Eventually(func() bool { + pod1 := corev1.Pod{} + err := mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: pod.Name, + Namespace: pod.Namespace, + }, &pod1) + if err != nil { + return false + } + pod1.Status.Conditions = []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + { + Type: v1alpha1.ReadinessGatePodServiceReady, + Status: corev1.ConditionTrue, + }, + } + return mgr.GetClient().Status().Update(context.TODO(), &pod1) == nil + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + + Eventually(func() bool { + pod1 := corev1.Pod{} + err := mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: pod.Name, + Namespace: pod.Namespace, + }, &pod1) + if err != nil { + return false + } + return pod1.GetLabels()["demo-traffic-on"] == "true" && pod1.GetLabels()["demo-traffic-weight"] == "100" + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + }) + + It("employee synced, employees deleted", func() { + pod1 := corev1.Pod{} + Expect(mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: pod.Name, + Namespace: pod.Namespace, + }, &pod1)).Should(BeNil()) + + Expect(mgr.GetClient().Delete(context.TODO(), &pod1)).Should(BeNil()) + + Eventually(func() bool { + pod1 := corev1.Pod{} + err := mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: pod.Name, + Namespace: pod.Namespace, + }, &pod1) + return errors.IsNotFound(err) + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + + Eventually(func() bool { + service1 := corev1.Service{} + Expect(mgr.GetClient().Get(context.TODO(), types.NamespacedName{ + Name: svc.Name, + Namespace: svc.Namespace, + }, &service1)).Should(BeNil()) + return !strings.Contains(service1.GetAnnotations()["demo-added-pods"], pod.Name) && + !strings.Contains(service1.GetAnnotations()[expectedFinalizerAddedAnnoKey], pod.Name) + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue()) + }) + }) +}) + +var _ = BeforeSuite(func() { + By("bootstrapping test environment") + + ctx, cancel = context.WithCancel(context.TODO()) + logf.SetLogger(zap.New(zap.WriteTo(os.Stdout), zap.UseDevMode(true))) + + env = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")}, + } + + config, err := env.Start() + Expect(err).NotTo(HaveOccurred()) + Expect(config).NotTo(BeNil()) + + mgr, err = manager.New(config, manager.Options{ + MetricsBindAddress: "0", + NewCache: inject.NewCacheWithFieldIndex, + }) + Expect(err).NotTo(HaveOccurred()) + + scheme := mgr.GetScheme() + err = appsv1.SchemeBuilder.AddToScheme(scheme) + Expect(err).NotTo(HaveOccurred()) + err = apis.AddToScheme(scheme) + Expect(err).NotTo(HaveOccurred()) + + err = AddToMgr(mgr, NewDemoReconcileAdapter(mgr.GetClient())) + Expect(err).NotTo(HaveOccurred()) + + go func() { + err = mgr.Start(ctx) + Expect(err).NotTo(HaveOccurred()) + }() +}) + +var _ = AfterSuite(func() { + By("tearing down the test environment") + + cancel() + + err := env.Stop() + Expect(err).NotTo(HaveOccurred()) +}) + +func TestResourceConsistController(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "resource consist controller test") +} diff --git a/pkg/controllers/resourceconsist/types.go b/pkg/controllers/resourceconsist/types.go index 9f25e923..d3ef0542 100644 --- a/pkg/controllers/resourceconsist/types.go +++ b/pkg/controllers/resourceconsist/types.go @@ -54,16 +54,16 @@ type ReconcileAdapter interface { GetExpectEmployer(ctx context.Context, employer client.Object) ([]IEmployer, error) GetCurrentEmployer(ctx context.Context, employer client.Object) ([]IEmployer, error) + // todo, ctx as an input CreateEmployer(employer client.Object, toCreate []IEmployer) ([]IEmployer, []IEmployer, error) UpdateEmployer(employer client.Object, toUpdate []IEmployer) ([]IEmployer, []IEmployer, error) DeleteEmployer(employer client.Object, toDelete []IEmployer) ([]IEmployer, []IEmployer, error) - RecordEmployer(succCreate, succUpdate, succDelete []IEmployer) error - // GetExpectEmployee and GetCurrentEmployee return expect/current status of employees from related backend provider GetExpectEmployee(ctx context.Context, employer client.Object) ([]IEmployee, error) GetCurrentEmployee(ctx context.Context, employer client.Object) ([]IEmployee, error) + // todo, ctx as an input CreateEmployees(employer client.Object, toCreate []IEmployee) ([]IEmployee, []IEmployee, error) UpdateEmployees(employer client.Object, toUpdate []IEmployee) ([]IEmployee, []IEmployee, error) DeleteEmployees(employer client.Object, toDelete []IEmployee) ([]IEmployee, []IEmployee, error) @@ -72,7 +72,7 @@ type ReconcileAdapter interface { type IEmployer interface { GetEmployerId() string GetEmployerStatuses() interface{} - EmployerEqual(employerStatuses interface{}) (bool, error) + EmployerEqual(employer IEmployer) (bool, error) } type IEmployee interface {