From 49de25d28b2de279240dfcc8b1b35095f68f0141 Mon Sep 17 00:00:00 2001 From: Ruben Vargas Date: Mon, 26 Aug 2019 19:18:57 -0500 Subject: [PATCH] Add resource limits for spark dependencies cronjob Signed-off-by: Ruben Vargas --- pkg/apis/jaegertracing/v1/jaeger_types.go | 3 + .../jaegertracing/v1/zz_generated.deepcopy.go | 1 + pkg/cronjob/spark_dependencies.go | 6 +- pkg/cronjob/spark_dependencies_test.go | 64 +++++++++++++++++++ pkg/util/util.go | 5 +- 5 files changed, 76 insertions(+), 3 deletions(-) diff --git a/pkg/apis/jaegertracing/v1/jaeger_types.go b/pkg/apis/jaegertracing/v1/jaeger_types.go index 67dd4e0f5..b46e03b0f 100644 --- a/pkg/apis/jaegertracing/v1/jaeger_types.go +++ b/pkg/apis/jaegertracing/v1/jaeger_types.go @@ -376,6 +376,9 @@ type JaegerDependenciesSpec struct { // +optional TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"` + + // +optional + Resources v1.ResourceRequirements `json:"resources,omitempty"` } // JaegerEsIndexCleanerSpec holds the options related to es-index-cleaner diff --git a/pkg/apis/jaegertracing/v1/zz_generated.deepcopy.go b/pkg/apis/jaegertracing/v1/zz_generated.deepcopy.go index 6ebceb0b7..535d8ee20 100644 --- a/pkg/apis/jaegertracing/v1/zz_generated.deepcopy.go +++ b/pkg/apis/jaegertracing/v1/zz_generated.deepcopy.go @@ -247,6 +247,7 @@ func (in *JaegerDependenciesSpec) DeepCopyInto(out *JaegerDependenciesSpec) { *out = new(int32) **out = **in } + in.Resources.DeepCopyInto(&out.Resources) return } diff --git a/pkg/cronjob/spark_dependencies.go b/pkg/cronjob/spark_dependencies.go index a6da449f8..18c155ea5 100644 --- a/pkg/cronjob/spark_dependencies.go +++ b/pkg/cronjob/spark_dependencies.go @@ -12,6 +12,7 @@ import ( v1 "github.com/jaegertracing/jaeger-operator/pkg/apis/jaegertracing/v1" "github.com/jaegertracing/jaeger-operator/pkg/storage" + "github.com/jaegertracing/jaeger-operator/pkg/util" ) var supportedStorageTypes = map[string]bool{"elasticsearch": true, "cassandra": true} @@ -33,6 +34,8 @@ func CreateSparkDependencies(jaeger *v1.Jaeger) *batchv1beta1.CronJob { trueVar := true one := int32(1) name := fmt.Sprintf("%s-spark-dependencies", jaeger.Name) + resources := jaeger.Spec.Storage.Dependencies.Resources + util.MergeResources(&resources, jaeger.Spec.Resources) return &batchv1beta1.CronJob{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -68,7 +71,8 @@ func CreateSparkDependencies(jaeger *v1.Jaeger) *batchv1beta1.CronJob { Image: jaeger.Spec.Storage.Dependencies.Image, Name: name, // let spark job use its default values - Env: removeEmptyVars(envVars), + Env: removeEmptyVars(envVars), + Resources: resources, }, }, RestartPolicy: corev1.RestartPolicyNever, diff --git a/pkg/cronjob/spark_dependencies_test.go b/pkg/cronjob/spark_dependencies_test.go index 4952a8760..3614c607f 100644 --- a/pkg/cronjob/spark_dependencies_test.go +++ b/pkg/cronjob/spark_dependencies_test.go @@ -5,6 +5,7 @@ import ( "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" v1 "github.com/jaegertracing/jaeger-operator/pkg/apis/jaegertracing/v1" ) @@ -83,3 +84,66 @@ func TestSparkDependencies(t *testing.T) { cjob := CreateSparkDependencies(j) assert.Equal(t, j.Namespace, cjob.Namespace) } + +func TestSparkDependenciesResources(t *testing.T) { + + parentResources := corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceLimitsCPU: *resource.NewQuantity(1024, resource.BinarySI), + corev1.ResourceLimitsEphemeralStorage: *resource.NewQuantity(512, resource.DecimalSI), + }, + Requests: corev1.ResourceList{ + corev1.ResourceRequestsCPU: *resource.NewQuantity(1024, resource.BinarySI), + corev1.ResourceRequestsEphemeralStorage: *resource.NewQuantity(512, resource.DecimalSI), + }, + } + + dependencyResources := corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceLimitsCPU: *resource.NewQuantity(2048, resource.BinarySI), + corev1.ResourceLimitsEphemeralStorage: *resource.NewQuantity(1024, resource.DecimalSI), + }, + Requests: corev1.ResourceList{ + corev1.ResourceRequestsCPU: *resource.NewQuantity(2048, resource.BinarySI), + corev1.ResourceRequestsEphemeralStorage: *resource.NewQuantity(1024, resource.DecimalSI), + }, + } + + tests := []struct { + jaeger *v1.Jaeger + expected corev1.ResourceRequirements + }{ + { + jaeger: &v1.Jaeger{Spec: v1.JaegerSpec{Storage: v1.JaegerStorageSpec{Type: "elasticsearch"}}}, + expected: corev1.ResourceRequirements{}, + }, + { + jaeger: &v1.Jaeger{Spec: v1.JaegerSpec{ + Storage: v1.JaegerStorageSpec{Type: "elasticsearch"}, + JaegerCommonSpec: v1.JaegerCommonSpec{ + Resources: parentResources, + }, + }}, + expected: parentResources, + }, + { + jaeger: &v1.Jaeger{Spec: v1.JaegerSpec{ + Storage: v1.JaegerStorageSpec{ + Type: "elasticsearch", + Dependencies: v1.JaegerDependenciesSpec{ + Resources: dependencyResources, + }, + }, + JaegerCommonSpec: v1.JaegerCommonSpec{ + Resources: parentResources, + }, + }}, + expected: dependencyResources, + }, + } + for _, test := range tests { + cjob := CreateSparkDependencies(test.jaeger) + assert.Equal(t, test.expected, cjob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Resources) + + } +} diff --git a/pkg/util/util.go b/pkg/util/util.go index 56f8fb53c..7520ba722 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -73,7 +73,7 @@ func Merge(commonSpecs []v1.JaegerCommonSpec) *v1.JaegerCommonSpec { volumes = append(volumes, commonSpec.Volumes...) // Merge resources - mergeResources(resources, commonSpec.Resources) + MergeResources(resources, commonSpec.Resources) // Set the affinity based on the most specific definition available if affinity == nil { @@ -104,7 +104,8 @@ func Merge(commonSpecs []v1.JaegerCommonSpec) *v1.JaegerCommonSpec { } } -func mergeResources(resources *corev1.ResourceRequirements, res corev1.ResourceRequirements) { +// MergeResources returns a merged version of two resource requirements +func MergeResources(resources *corev1.ResourceRequirements, res corev1.ResourceRequirements) { for k, v := range res.Limits { if _, ok := resources.Limits[k]; !ok {