From d0daf2fd1740a6c448baf7091b1976d46ef4a023 Mon Sep 17 00:00:00 2001 From: Yi Chen Date: Thu, 24 Oct 2024 10:23:30 +0800 Subject: [PATCH] Support pod template for Spark 3.x applications (#2141) * Update API definition to support pod template Signed-off-by: Yi Chen * Mark pod template field as schemaless Signed-off-by: Yi Chen * Add kubebuilder marker to preserve unknown fields Signed-off-by: Yi Chen * Add example for using pod template Signed-off-by: Yi Chen * Support pod template Signed-off-by: Yi Chen --------- Signed-off-by: Yi Chen --- Makefile | 2 +- api/v1beta2/sparkapplication_types.go | 8 + api/v1beta2/zz_generated.deepcopy.go | 5 + ...tor.k8s.io_scheduledsparkapplications.yaml | 31 +++ ...parkoperator.k8s.io_sparkapplications.yaml | 31 +++ ...tor.k8s.io_scheduledsparkapplications.yaml | 31 +++ ...parkoperator.k8s.io_sparkapplications.yaml | 31 +++ docs/api-docs.md | 16 ++ examples/spark-pi-pod-template.yaml | 197 ++++++++++++++++++ go.mod | 3 +- .../controller/sparkapplication/controller.go | 27 ++- .../controller/sparkapplication/submission.go | 60 +++++- .../webhook/sparkapplication_defaulter.go | 23 -- .../webhook/sparkapplication_validator.go | 14 ++ pkg/common/spark.go | 3 + pkg/util/util.go | 41 ++++ pkg/util/util_test.go | 72 +++++++ 17 files changed, 566 insertions(+), 29 deletions(-) create mode 100644 examples/spark-pi-pod-template.yaml diff --git a/Makefile b/Makefile index 3283f2a4ab..49a2713c14 100644 --- a/Makefile +++ b/Makefile @@ -109,7 +109,7 @@ print-%: ; @echo $*=$($*) .PHONY: manifests manifests: controller-gen ## Generate CustomResourceDefinition, RBAC and WebhookConfiguration manifests. - $(CONTROLLER_GEN) crd rbac:roleName=spark-operator-controller webhook paths="./..." output:crd:artifacts:config=config/crd/bases + $(CONTROLLER_GEN) crd:generateEmbeddedObjectMeta=true rbac:roleName=spark-operator-controller webhook paths="./..." output:crd:artifacts:config=config/crd/bases .PHONY: generate generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations. diff --git a/api/v1beta2/sparkapplication_types.go b/api/v1beta2/sparkapplication_types.go index 71a810cbf2..c56891187f 100644 --- a/api/v1beta2/sparkapplication_types.go +++ b/api/v1beta2/sparkapplication_types.go @@ -409,6 +409,14 @@ type Dependencies struct { // SparkPodSpec defines common things that can be customized for a Spark driver or executor pod. // TODO: investigate if we should use v1.PodSpec and limit what can be set instead. type SparkPodSpec struct { + // Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + // Spark version >= 3.0.0 is required. + // Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + // +optional + // +kubebuilder:validation:Schemaless + // +kubebuilder:validation:Type:=object + // +kubebuilder:pruning:PreserveUnknownFields + Template *corev1.PodTemplateSpec `json:"template,omitempty"` // Cores maps to `spark.driver.cores` or `spark.executor.cores` for the driver and executors, respectively. // +optional // +kubebuilder:validation:Minimum=1 diff --git a/api/v1beta2/zz_generated.deepcopy.go b/api/v1beta2/zz_generated.deepcopy.go index eb7c6239d5..635e19af58 100644 --- a/api/v1beta2/zz_generated.deepcopy.go +++ b/api/v1beta2/zz_generated.deepcopy.go @@ -876,6 +876,11 @@ func (in *SparkApplicationStatus) DeepCopy() *SparkApplicationStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SparkPodSpec) DeepCopyInto(out *SparkPodSpec) { *out = *in + if in.Template != nil { + in, out := &in.Template, &out.Template + *out = new(v1.PodTemplateSpec) + (*in).DeepCopyInto(*out) + } if in.Cores != nil { in, out := &in.Cores, &out.Cores *out = new(int32) diff --git a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml index 0ce3ee05e1..7aa9c4af24 100644 --- a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml +++ b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml @@ -4755,6 +4755,13 @@ spec: - name type: object type: array + template: + description: |- + Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + Spark version >= 3.0.0 is required. + Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + type: object + x-kubernetes-preserve-unknown-fields: true terminationGracePeriodSeconds: description: Termination grace period seconds for the pod format: int64 @@ -9512,6 +9519,13 @@ spec: - name type: object type: array + template: + description: |- + Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + Spark version >= 3.0.0 is required. + Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + type: object + x-kubernetes-preserve-unknown-fields: true terminationGracePeriodSeconds: description: Termination grace period seconds for the pod format: int64 @@ -10351,6 +10365,23 @@ spec: May contain labels and annotations that will be copied into the PVC when creating it. No other fields are allowed and will be rejected during validation. + properties: + annotations: + additionalProperties: + type: string + type: object + finalizers: + items: + type: string + type: array + labels: + additionalProperties: + type: string + type: object + name: + type: string + namespace: + type: string type: object spec: description: |- diff --git a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_sparkapplications.yaml b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_sparkapplications.yaml index 70034a0f9c..4c839e36ea 100644 --- a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_sparkapplications.yaml +++ b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_sparkapplications.yaml @@ -4694,6 +4694,13 @@ spec: - name type: object type: array + template: + description: |- + Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + Spark version >= 3.0.0 is required. + Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + type: object + x-kubernetes-preserve-unknown-fields: true terminationGracePeriodSeconds: description: Termination grace period seconds for the pod format: int64 @@ -9421,6 +9428,13 @@ spec: - name type: object type: array + template: + description: |- + Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + Spark version >= 3.0.0 is required. + Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + type: object + x-kubernetes-preserve-unknown-fields: true terminationGracePeriodSeconds: description: Termination grace period seconds for the pod format: int64 @@ -10257,6 +10271,23 @@ spec: May contain labels and annotations that will be copied into the PVC when creating it. No other fields are allowed and will be rejected during validation. + properties: + annotations: + additionalProperties: + type: string + type: object + finalizers: + items: + type: string + type: array + labels: + additionalProperties: + type: string + type: object + name: + type: string + namespace: + type: string type: object spec: description: |- diff --git a/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml b/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml index 0ce3ee05e1..7aa9c4af24 100644 --- a/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml +++ b/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml @@ -4755,6 +4755,13 @@ spec: - name type: object type: array + template: + description: |- + Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + Spark version >= 3.0.0 is required. + Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + type: object + x-kubernetes-preserve-unknown-fields: true terminationGracePeriodSeconds: description: Termination grace period seconds for the pod format: int64 @@ -9512,6 +9519,13 @@ spec: - name type: object type: array + template: + description: |- + Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + Spark version >= 3.0.0 is required. + Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + type: object + x-kubernetes-preserve-unknown-fields: true terminationGracePeriodSeconds: description: Termination grace period seconds for the pod format: int64 @@ -10351,6 +10365,23 @@ spec: May contain labels and annotations that will be copied into the PVC when creating it. No other fields are allowed and will be rejected during validation. + properties: + annotations: + additionalProperties: + type: string + type: object + finalizers: + items: + type: string + type: array + labels: + additionalProperties: + type: string + type: object + name: + type: string + namespace: + type: string type: object spec: description: |- diff --git a/config/crd/bases/sparkoperator.k8s.io_sparkapplications.yaml b/config/crd/bases/sparkoperator.k8s.io_sparkapplications.yaml index 70034a0f9c..4c839e36ea 100644 --- a/config/crd/bases/sparkoperator.k8s.io_sparkapplications.yaml +++ b/config/crd/bases/sparkoperator.k8s.io_sparkapplications.yaml @@ -4694,6 +4694,13 @@ spec: - name type: object type: array + template: + description: |- + Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + Spark version >= 3.0.0 is required. + Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + type: object + x-kubernetes-preserve-unknown-fields: true terminationGracePeriodSeconds: description: Termination grace period seconds for the pod format: int64 @@ -9421,6 +9428,13 @@ spec: - name type: object type: array + template: + description: |- + Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + Spark version >= 3.0.0 is required. + Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + type: object + x-kubernetes-preserve-unknown-fields: true terminationGracePeriodSeconds: description: Termination grace period seconds for the pod format: int64 @@ -10257,6 +10271,23 @@ spec: May contain labels and annotations that will be copied into the PVC when creating it. No other fields are allowed and will be rejected during validation. + properties: + annotations: + additionalProperties: + type: string + type: object + finalizers: + items: + type: string + type: array + labels: + additionalProperties: + type: string + type: object + name: + type: string + namespace: + type: string type: object spec: description: |- diff --git a/docs/api-docs.md b/docs/api-docs.md index 531c53f9ab..08f8d42d88 100644 --- a/docs/api-docs.md +++ b/docs/api-docs.md @@ -2828,6 +2828,22 @@ TODO: investigate if we should use v1.PodSpec and limit what can be set instead. +template
+ + +Kubernetes core/v1.PodTemplateSpec + + + + +(Optional) +

Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. +Spark version >= 3.0.0 is required. +Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template.

+ + + + cores
int32 diff --git a/examples/spark-pi-pod-template.yaml b/examples/spark-pi-pod-template.yaml new file mode 100644 index 0000000000..552dc0030a --- /dev/null +++ b/examples/spark-pi-pod-template.yaml @@ -0,0 +1,197 @@ +# +# Copyright 2024 The Kubeflow authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-configmap + namespace: default +data: + KEY1: VALUE1 + +--- +apiVersion: v1 +kind: Secret +metadata: + name: test-secret + namespace: default +stringData: + KEY2: VALUE2 + +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-configmap-2 + namespace: default +data: + KEY3: VALUE3 + +--- +apiVersion: v1 +kind: Secret +metadata: + name: test-secret-2 + namespace: default +stringData: + KEY4: VALUE4 + +--- +apiVersion: sparkoperator.k8s.io/v1beta2 +kind: SparkApplication +metadata: + name: spark-pi-pod-template + namespace: default +spec: + type: Scala + mode: cluster + sparkVersion: 3.5.3 + image: spark:3.5.3 + imagePullPolicy: IfNotPresent + mainApplicationFile: local:///opt/spark/examples/jars/spark-examples.jar + mainClass: org.apache.spark.examples.SparkPi + arguments: + - "10000" + driver: + template: + metadata: + labels: + spark.apache.org/version: 3.5.3 + annotations: + spark.apache.org/version: 3.5.3 + spec: + containers: + - name: spark-kubernetes-driver + env: + - name: KEY0 + value: VALUE0 + - name: KEY1 + valueFrom: + configMapKeyRef: + name: test-configmap + key: KEY1 + - name: KEY2 + valueFrom: + secretKeyRef: + name: test-secret + key: KEY2 + envFrom: + - configMapRef: + name: test-configmap-2 + - secretRef: + name: test-secret-2 + ports: + - name: custom-port + containerPort: 12345 + protocol: TCP + # The resources section will not work for cpu/memory requests and limits. + # Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + resources: + requests: + # Please use `spec.driver.cores` instead. + cpu: 500m + # Please use `spec.driver.memory` and `spec.driver.memoryOverhead` instead. + memory: 512Mi + limits: + # Please use `spec.driver.coreLimit` instead. + cpu: 1 + # Please use `spec.driver.memory` and `spec.driver.memoryOverhead` instead. + memory: 1Gi + nodeSelector: + kubernetes.io/os: linux + affinity: + podAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 1 + podAffinityTerm: + labelSelector: + matchLabels: + spark-app-name: spark-pi-pod-template + topologyKey: kubernetes.io/hostname + tolerations: + - operator: Exists + effect: NoSchedule + serviceAccountName: spark-operator-spark + cores: 1 + coreLimit: "1" + memory: 512m + memoryOverhead: 512m + executor: + instances: 1 + template: + metadata: + labels: + spark.apache.org/version: 3.5.3 + annotations: + spark.apache.org/version: 3.5.3 + spec: + containers: + - name: spark-kubernetes-executor + env: + - name: KEY0 + value: VALUE0 + - name: KEY1 + valueFrom: + configMapKeyRef: + name: test-configmap + key: KEY1 + - name: KEY2 + valueFrom: + secretKeyRef: + name: test-secret + key: KEY2 + envFrom: + - configMapRef: + name: test-configmap-2 + - secretRef: + name: test-secret-2 + volumeMounts: + - name: spark-local-dir-1 + mountPath: /mnt/disk1 + # The resources section will not work for cpu/memory requests and limits. + # Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + resources: + requests: + # Please use `spec.executor.cores` instead. + cpu: 1 + # Please use `spec.executor.memory` and `spec.executor.memoryOverhead` instead. + memory: 1Gi + limits: + # Please use `spec.executor.coreLimit` instead. + cpu: 1500m + # Please use `spec.executor.memory` and `spec.executor.memoryOverhead` instead. + memory: 1512Mi + volumes: + - name: spark-local-dir-1 + emptyDir: + sizeLimit: 100Mi + nodeSelector: + kubernetes.io/os: linux + affinity: + podAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 1 + podAffinityTerm: + labelSelector: + matchLabels: + spark-app-name: spark-pi-pod-template + topologyKey: kubernetes.io/hostname + tolerations: + - operator: Exists + effect: NoSchedule + cores: 1 + coreLimit: 1500m + memory: 1g + memoryOverhead: 512m diff --git a/go.mod b/go.mod index d5e600b9cb..11af87ff3b 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/stretchr/testify v1.9.0 go.uber.org/zap v1.27.0 gocloud.dev v0.40.0 + golang.org/x/mod v0.20.0 golang.org/x/net v0.30.0 golang.org/x/time v0.7.0 helm.sh/helm/v3 v3.16.2 @@ -30,6 +31,7 @@ require ( k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 sigs.k8s.io/controller-runtime v0.17.5 sigs.k8s.io/scheduler-plugins v0.29.8 + sigs.k8s.io/yaml v1.4.0 volcano.sh/apis v1.9.0 ) @@ -229,7 +231,6 @@ require ( sigs.k8s.io/kustomize/api v0.17.2 // indirect sigs.k8s.io/kustomize/kyaml v0.17.1 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect - sigs.k8s.io/yaml v1.4.0 // indirect ) replace ( diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 7a707df001..e4cb78d248 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -19,6 +19,7 @@ package sparkapplication import ( "context" "fmt" + "os" "strconv" "time" @@ -643,8 +644,10 @@ func (r *Reconciler) getSparkApplication(key types.NamespacedName) (*v1beta2.Spa // submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit. func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (submitErr error) { logger.Info("Submitting SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) + // SubmissionID must be set before creating any resources to ensure all the resources are labeled. app.Status.SubmissionID = uuid.New().String() + app.Status.DriverInfo.PodName = util.GetDriverPodName(app) app.Status.LastSubmissionAttemptTime = metav1.Now() app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1 @@ -736,8 +739,12 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (subm } } - driverPodName := util.GetDriverPodName(app) - app.Status.DriverInfo.PodName = driverPodName + defer func() { + if err := r.cleanUpPodTemplateFiles(app); err != nil { + logger.Error(fmt.Errorf("failed to clean up pod template files: %v", err), "name", app.Name, "namespace", app.Namespace) + } + }() + sparkSubmitArgs, err := buildSparkSubmitArgs(app) if err != nil { return fmt.Errorf("failed to build spark-submit arguments: %v", err) @@ -746,6 +753,7 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (subm // Try submitting the application by running spark-submit. logger.Info("Running spark-submit for SparkApplication", "name", app.Name, "namespace", app.Namespace, "arguments", sparkSubmitArgs) if err := runSparkSubmit(newSubmission(sparkSubmitArgs, app)); err != nil { + r.recordSparkApplicationEvent(app) return fmt.Errorf("failed to run spark-submit: %v", err) } return nil @@ -1224,3 +1232,18 @@ func (r *Reconciler) cleanUpOnTermination(_, newApp *v1beta2.SparkApplication) e } return nil } + +// cleanUpPodTemplateFiles cleans up the driver and executor pod template files. +func (r *Reconciler) cleanUpPodTemplateFiles(app *v1beta2.SparkApplication) error { + if app.Spec.Driver.Template == nil && app.Spec.Executor.Template == nil { + return nil + } + path := fmt.Sprintf("/tmp/spark/%s", app.Status.SubmissionID) + if err := os.RemoveAll(path); err != nil { + if !os.IsNotExist(err) { + return err + } + } + logger.V(1).Info("Deleted pod template files", "path", path) + return nil +} diff --git a/internal/controller/sparkapplication/submission.go b/internal/controller/sparkapplication/submission.go index 66e4a0be83..d0bb6f5781 100644 --- a/internal/controller/sparkapplication/submission.go +++ b/internal/controller/sparkapplication/submission.go @@ -83,15 +83,17 @@ func buildSparkSubmitArgs(app *v1beta2.SparkApplication) ([]string, error) { submissionWaitAppCompletionOption, sparkConfOption, hadoopConfOption, + driverPodTemplateOption, driverPodNameOption, driverConfOption, - driverSecretOption, driverEnvOption, + driverSecretOption, driverVolumeMountsOption, + executorPodTemplateOption, executorConfOption, + executorEnvOption, executorSecretOption, executorVolumeMountsOption, - executorEnvOption, nodeSelectorOption, dynamicAllocationOption, proxyUserOption, @@ -303,6 +305,12 @@ func driverConfOption(app *v1beta2.SparkApplication) ([]string, error) { property = fmt.Sprintf(common.SparkKubernetesDriverLabelTemplate, common.LabelLaunchedBySparkOperator) args = append(args, "--conf", fmt.Sprintf("%s=%s", property, "true")) + // If Spark version is less than 3.0.0 or driver pod template is not defined, then the driver pod needs to be mutated by the webhook. + if util.CompareSemanticVersion(app.Spec.SparkVersion, "3.0.0") < 0 || app.Spec.Driver.Template == nil { + property = fmt.Sprintf(common.SparkKubernetesDriverLabelTemplate, common.LabelMutatedBySparkOperator) + args = append(args, "--conf", fmt.Sprintf("%s=%s", property, "true")) + } + property = fmt.Sprintf(common.SparkKubernetesDriverLabelTemplate, common.LabelSubmissionID) args = append(args, "--conf", fmt.Sprintf("%s=%s", property, app.Status.SubmissionID)) @@ -646,6 +654,12 @@ func executorConfOption(app *v1beta2.SparkApplication) ([]string, error) { property = fmt.Sprintf(common.SparkKubernetesExecutorLabelTemplate, common.LabelLaunchedBySparkOperator) args = append(args, "--conf", fmt.Sprintf("%s=%s", property, "true")) + // If Spark version is less than 3.0.0 or executor pod template is not defined, then the executor pods need to be mutated by the webhook. + if util.CompareSemanticVersion(app.Spec.SparkVersion, "3.0.0") < 0 || app.Spec.Executor.Template == nil { + property = fmt.Sprintf(common.SparkKubernetesExecutorLabelTemplate, common.LabelMutatedBySparkOperator) + args = append(args, "--conf", fmt.Sprintf("%s=%s", property, "true")) + } + property = fmt.Sprintf(common.SparkKubernetesExecutorLabelTemplate, common.LabelSubmissionID) args = append(args, "--conf", fmt.Sprintf("%s=%s", property, app.Status.SubmissionID)) @@ -1022,3 +1036,45 @@ func mainApplicationFileOption(app *v1beta2.SparkApplication) ([]string, error) func applicationOption(app *v1beta2.SparkApplication) ([]string, error) { return app.Spec.Arguments, nil } + +// driverPodTemplateOption returns the driver pod template arguments. +func driverPodTemplateOption(app *v1beta2.SparkApplication) ([]string, error) { + if app.Spec.Driver.Template == nil { + return []string{}, nil + } + + podTemplateFile := fmt.Sprintf("/tmp/spark/%s/driver-pod-template.yaml", app.Status.SubmissionID) + if err := util.WriteObjectToFile(app.Spec.Driver.Template, podTemplateFile); err != nil { + return []string{}, err + } + logger.V(1).Info("Created driver pod template file for SparkApplication", "name", app.Name, "namespace", app.Namespace, "file", podTemplateFile) + + args := []string{ + "--conf", + fmt.Sprintf("%s=%s", common.SparkKubernetesDriverPodTemplateFile, podTemplateFile), + "--conf", + fmt.Sprintf("%s=%s", common.SparkKubernetesDriverPodTemplateContainerName, common.SparkDriverContainerName), + } + return args, nil +} + +// executorPodTemplateOption returns the executor pod template arguments. +func executorPodTemplateOption(app *v1beta2.SparkApplication) ([]string, error) { + if app.Spec.Executor.Template == nil { + return []string{}, nil + } + + podTemplateFile := fmt.Sprintf("/tmp/spark/%s/executor-pod-template.yaml", app.Status.SubmissionID) + if err := util.WriteObjectToFile(app.Spec.Executor.Template, podTemplateFile); err != nil { + return []string{}, err + } + logger.V(1).Info("Created executor pod template file for SparkApplication", "name", app.Name, "namespace", app.Namespace, "file", podTemplateFile) + + args := []string{ + "--conf", + fmt.Sprintf("%s=%s", common.SparkKubernetesExecutorPodTemplateFile, podTemplateFile), + "--conf", + fmt.Sprintf("%s=%s", common.SparkKubernetesExecutorPodTemplateContainerName, common.Spark3DefaultExecutorContainerName), + } + return args, nil +} diff --git a/internal/webhook/sparkapplication_defaulter.go b/internal/webhook/sparkapplication_defaulter.go index 661ecf708a..9c10ea10c2 100644 --- a/internal/webhook/sparkapplication_defaulter.go +++ b/internal/webhook/sparkapplication_defaulter.go @@ -83,32 +83,9 @@ func defaultSparkApplication(app *v1beta2.SparkApplication) { } func defaultDriverSpec(app *v1beta2.SparkApplication) { - if app.Spec.Driver.Cores == nil { - if app.Spec.SparkConf == nil || app.Spec.SparkConf[common.SparkDriverCores] == "" { - app.Spec.Driver.Cores = util.Int32Ptr(1) - } - } - - if app.Spec.Driver.Memory == nil { - if app.Spec.SparkConf == nil || app.Spec.SparkConf[common.SparkDriverMemory] == "" { - app.Spec.Driver.Memory = util.StringPtr("1g") - } - } } func defaultExecutorSpec(app *v1beta2.SparkApplication) { - if app.Spec.Executor.Cores == nil { - if app.Spec.SparkConf == nil || app.Spec.SparkConf[common.SparkExecutorCores] == "" { - app.Spec.Executor.Cores = util.Int32Ptr(1) - } - } - - if app.Spec.Executor.Memory == nil { - if app.Spec.SparkConf == nil || app.Spec.SparkConf[common.SparkExecutorMemory] == "" { - app.Spec.Executor.Memory = util.StringPtr("1g") - } - } - if app.Spec.Executor.Instances == nil { // Check whether dynamic allocation is enabled in application spec. enableDynamicAllocation := app.Spec.DynamicAllocation != nil && app.Spec.DynamicAllocation.Enabled diff --git a/internal/webhook/sparkapplication_validator.go b/internal/webhook/sparkapplication_validator.go index 7b1fd41085..e1dd4f6f6a 100644 --- a/internal/webhook/sparkapplication_validator.go +++ b/internal/webhook/sparkapplication_validator.go @@ -117,6 +117,10 @@ func (v *SparkApplicationValidator) ValidateDelete(ctx context.Context, obj runt func (v *SparkApplicationValidator) validateSpec(_ context.Context, app *v1beta2.SparkApplication) error { logger.V(1).Info("Validating SparkApplication spec", "name", app.Name, "namespace", app.Namespace, "state", util.GetApplicationState(app)) + if err := v.validateSparkVersion(app); err != nil { + return err + } + if app.Spec.NodeSelector != nil && (app.Spec.Driver.NodeSelector != nil || app.Spec.Executor.NodeSelector != nil) { return fmt.Errorf("node selector cannot be defined at both SparkApplication and Driver/Executor") } @@ -144,6 +148,16 @@ func (v *SparkApplicationValidator) validateSpec(_ context.Context, app *v1beta2 return nil } +func (v *SparkApplicationValidator) validateSparkVersion(app *v1beta2.SparkApplication) error { + // The pod template feature requires Spark version 3.0.0 or higher. + if app.Spec.Driver.Template != nil || app.Spec.Executor.Template != nil { + if util.CompareSemanticVersion(app.Spec.SparkVersion, "3.0.0") < 0 { + return fmt.Errorf("pod template feature requires Spark version 3.0.0 or higher") + } + } + return nil +} + func (v *SparkApplicationValidator) validateResourceUsage(ctx context.Context, app *v1beta2.SparkApplication) error { logger.V(1).Info("Validating SparkApplication resource usage", "name", app.Name, "namespace", app.Namespace, "state", util.GetApplicationState(app)) diff --git a/pkg/common/spark.go b/pkg/common/spark.go index 24ea5ff315..94ae2c51da 100644 --- a/pkg/common/spark.go +++ b/pkg/common/spark.go @@ -307,6 +307,9 @@ const ( // LabelLaunchedBySparkOperator is a label on Spark pods launched through the Spark Operator. LabelLaunchedBySparkOperator = LabelAnnotationPrefix + "launched-by-spark-operator" + // LabelMutatedBySparkOperator is a label on Spark pods that need to be mutated by webhook. + LabelMutatedBySparkOperator = LabelAnnotationPrefix + "mutated-by-spark-operator" + // LabelSubmissionID is the label that records the submission ID of the current run of an application. LabelSubmissionID = LabelAnnotationPrefix + "submission-id" diff --git a/pkg/util/util.go b/pkg/util/util.go index 850bc209d0..25f664dbca 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -19,8 +19,12 @@ package util import ( "fmt" "os" + "path/filepath" "strings" + "golang.org/x/mod/semver" + "sigs.k8s.io/yaml" + "github.com/kubeflow/spark-operator/pkg/common" ) @@ -77,3 +81,40 @@ func Int64Ptr(n int64) *int64 { func StringPtr(s string) *string { return &s } + +// CompareSemanticVersion compares two semantic versions. +func CompareSemanticVersion(v1, v2 string) int { + // Add 'v' prefix if needed + addPrefix := func(s string) string { + if !strings.HasPrefix(s, "v") { + return "v" + s + } + return s + } + return semver.Compare(addPrefix(v1), addPrefix(v2)) +} + +// WriteObjectToFile marshals the given object into a YAML document and writes it to the given file. +func WriteObjectToFile(obj interface{}, filePath string) error { + if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil { + return err + } + + file, err := os.Create(filePath) + if err != nil { + return err + } + defer file.Close() + + data, err := yaml.Marshal(obj) + if err != nil { + return err + } + + _, err = file.Write(data) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 324ed3580f..5f24d4a372 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -21,6 +21,8 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/kubeflow/spark-operator/pkg/common" "github.com/kubeflow/spark-operator/pkg/util" @@ -129,3 +131,73 @@ var _ = Describe("StringPtr", func() { Expect(util.StringPtr(s)).To(Equal(&s)) }) }) + +var _ = Describe("CompareSemanticVersions", func() { + It("Should return 0 if the two versions are equal", func() { + Expect(util.CompareSemanticVersion("1.2.3", "1.2.3")) + Expect(util.CompareSemanticVersion("1.2.3", "v1.2.3")).To(Equal(0)) + }) + + It("Should return -1 if the first version is less than the second version", func() { + Expect(util.CompareSemanticVersion("2.3.4", "2.4.5")).To(Equal(-1)) + Expect(util.CompareSemanticVersion("2.4.5", "2.4.8")).To(Equal(-1)) + Expect(util.CompareSemanticVersion("2.4.8", "3.5.2")).To(Equal(-1)) + }) + + It("Should return +1 if the first version is greater than the second version", func() { + Expect(util.CompareSemanticVersion("2.4.5", "2.3.4")).To(Equal(1)) + Expect(util.CompareSemanticVersion("2.4.8", "2.4.5")).To(Equal(1)) + Expect(util.CompareSemanticVersion("3.5.2", "2.4.8")).To(Equal(1)) + }) +}) + +var _ = Describe("WriteObjectToFile", func() { + It("Should write the object to the file", func() { + podTemplate := &corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Labels: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + Annotations: map[string]string{ + "key3": "value3", + "key4": "value4", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "test-image", + }, + }, + }, + } + + expected := `metadata: + annotations: + key3: value3 + key4: value4 + creationTimestamp: null + labels: + key1: value1 + key2: value2 + name: test-pod +spec: + containers: + - image: test-image + name: test-container + resources: {} +` + file := "pod-template.yaml" + Expect(util.WriteObjectToFile(podTemplate, file)).To(Succeed()) + + data, err := os.ReadFile(file) + Expect(err).NotTo(HaveOccurred()) + actual := string(data) + + Expect(actual).To(Equal(expected)) + Expect(os.Remove(file)).NotTo(HaveOccurred()) + }) +})