From eeb9dd4d9d4e069986b0cd9e53dcb21193ab58c5 Mon Sep 17 00:00:00 2001 From: dayko Date: Tue, 4 Jul 2023 15:59:23 +0800 Subject: [PATCH] hp: support cronjob as external horizontal portrait algorithm job --- .../externaljob/jobcontroller/cron_job.go | 123 ++++++++++++++++++ .../jobcontroller/cron_job_test.go | 83 ++++++++++++ 2 files changed, 206 insertions(+) create mode 100644 pkg/portrait/algorithm/externaljob/jobcontroller/cron_job.go create mode 100644 pkg/portrait/algorithm/externaljob/jobcontroller/cron_job_test.go diff --git a/pkg/portrait/algorithm/externaljob/jobcontroller/cron_job.go b/pkg/portrait/algorithm/externaljob/jobcontroller/cron_job.go new file mode 100644 index 0000000..6f06fce --- /dev/null +++ b/pkg/portrait/algorithm/externaljob/jobcontroller/cron_job.go @@ -0,0 +1,123 @@ +/* + Copyright 2023 The Kapacity Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package jobcontroller + +import ( + "context" + "fmt" + + batchv1 "k8s.io/api/batch/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + autoscalingv1alpha1 "github.com/traas-stack/kapacity/apis/autoscaling/v1alpha1" + "github.com/traas-stack/kapacity/pkg/util" +) + +type CronJobHorizontal struct { + client client.Client +} + +func NewCronJobHorizontal(client client.Client) Horizontal { + return &CronJobHorizontal{ + client: client, + } +} + +func (c *CronJobHorizontal) UpdateJob(ctx context.Context, hp *autoscalingv1alpha1.HorizontalPortrait, cfg *autoscalingv1alpha1.PortraitAlgorithmJob) error { + cronJobNamespacedName := types.NamespacedName{ + Namespace: hp.Namespace, + Name: hp.Name, + } + cronJob := &batchv1.CronJob{} + if err := c.client.Get(ctx, cronJobNamespacedName, cronJob); err != nil { + if apierrors.IsNotFound(err) { + cronJob = &batchv1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: cronJobNamespacedName.Namespace, + Name: cronJobNamespacedName.Name, + Annotations: cfg.CronJob.Template.Annotations, + Labels: cfg.CronJob.Template.Labels, + OwnerReferences: []metav1.OwnerReference{ + *util.NewControllerRef(hp), + }, + }, + Spec: cfg.CronJob.Template.Spec, + } + + if err := c.client.Create(ctx, cronJob); err != nil { + return fmt.Errorf("failed to create CronJob %q: %v", cronJobNamespacedName, err) + } + + return nil + } + + return fmt.Errorf("failed to get CronJob %q: %v", cronJobNamespacedName, err) + } + + if apiequality.Semantic.DeepEqual(cronJob.Spec, cfg.CronJob.Template.Spec) && + ifUnchanged(cronJob.Labels, cfg.CronJob.Template.Labels) && + ifUnchanged(cronJob.Annotations, cfg.CronJob.Template.Annotations) { + return nil + } + + patch := client.MergeFrom(cronJob.DeepCopy()) + cronJob.Spec = cfg.CronJob.Template.Spec + if err := c.client.Patch(ctx, cronJob, patch); err != nil { + return fmt.Errorf("failed to patch CronJob %q: %v", cronJobNamespacedName, err) + } + + return nil +} + +func (c *CronJobHorizontal) CleanupJob(ctx context.Context, hp *autoscalingv1alpha1.HorizontalPortrait) error { + cronJobNamespacedName := types.NamespacedName{ + Namespace: hp.Namespace, + Name: hp.Name, + } + + cronJob := &batchv1.CronJob{} + if err := c.client.Get(ctx, cronJobNamespacedName, cronJob); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to get CronJob %q: %v", cronJobNamespacedName, err) + } + + if err := client.IgnoreNotFound(c.client.Delete(ctx, cronJob)); err != nil { + return fmt.Errorf("failed to delete CronJob %q: %v", cronJobNamespacedName, err) + } + return nil +} + +// ifUnchanged Compare newMap and oldMap, if the key or value in newMap changes, return true, otherwise return false +func ifUnchanged(oldMap, newMap map[string]string) bool { + if newMap == nil { + return oldMap == nil + } + + for k, v := range newMap { + if vb, ok := oldMap[k]; !ok || vb != v { + return false + } + } + + return true +} diff --git a/pkg/portrait/algorithm/externaljob/jobcontroller/cron_job_test.go b/pkg/portrait/algorithm/externaljob/jobcontroller/cron_job_test.go new file mode 100644 index 0000000..a870820 --- /dev/null +++ b/pkg/portrait/algorithm/externaljob/jobcontroller/cron_job_test.go @@ -0,0 +1,83 @@ +/* + Copyright 2023 The Kapacity Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package jobcontroller + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + batchv1 "k8s.io/api/batch/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + + autoscalingv1alpha1 "github.com/traas-stack/kapacity/apis/autoscaling/v1alpha1" +) + +var ( + scheme = runtime.NewScheme() +) + +func init() { + _ = clientgoscheme.AddToScheme(scheme) + _ = autoscalingv1alpha1.AddToScheme(scheme) +} + +func TestCronJobHorizontal(t *testing.T) { + fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects().Build() + ctx := context.Background() + horizontalPortrait := &autoscalingv1alpha1.HorizontalPortrait{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "test-predictive", + }, + } + + cfg := &autoscalingv1alpha1.PortraitAlgorithmJob{ + Type: autoscalingv1alpha1.CronJobPortraitAlgorithmJobType, + CronJob: &autoscalingv1alpha1.CronJobPortraitAlgorithmJob{ + Template: autoscalingv1alpha1.CronJobTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"cronjob": "test"}, + }, + Spec: batchv1.CronJobSpec{ + Schedule: "* * * * *", + JobTemplate: batchv1.JobTemplateSpec{}, + }, + }, + }, + } + + cronJobHorizontal := NewCronJobHorizontal(fakeClient) + err := cronJobHorizontal.UpdateJob(ctx, horizontalPortrait, cfg) + assert.Nil(t, err) + + cronJob := &batchv1.CronJob{} + _ = fakeClient.Get(ctx, types.NamespacedName{Namespace: horizontalPortrait.Namespace, Name: horizontalPortrait.Name}, cronJob) + assert.NotNil(t, cronJob) + + err = cronJobHorizontal.CleanupJob(ctx, horizontalPortrait) + assert.Nil(t, err) + + err = fakeClient.Get(ctx, types.NamespacedName{Namespace: horizontalPortrait.Namespace, Name: horizontalPortrait.Name}, cronJob) + assert.True(t, apierrors.IsNotFound(err)) +}