diff --git a/go.mod b/go.mod index d5e600b9cb..11af87ff3b 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/stretchr/testify v1.9.0 go.uber.org/zap v1.27.0 gocloud.dev v0.40.0 + golang.org/x/mod v0.20.0 golang.org/x/net v0.30.0 golang.org/x/time v0.7.0 helm.sh/helm/v3 v3.16.2 @@ -30,6 +31,7 @@ require ( k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 sigs.k8s.io/controller-runtime v0.17.5 sigs.k8s.io/scheduler-plugins v0.29.8 + sigs.k8s.io/yaml v1.4.0 volcano.sh/apis v1.9.0 ) @@ -229,7 +231,6 @@ require ( sigs.k8s.io/kustomize/api v0.17.2 // indirect sigs.k8s.io/kustomize/kyaml v0.17.1 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect - sigs.k8s.io/yaml v1.4.0 // indirect ) replace ( diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 7a707df001..e4cb78d248 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -19,6 +19,7 @@ package sparkapplication import ( "context" "fmt" + "os" "strconv" "time" @@ -643,8 +644,10 @@ func (r *Reconciler) getSparkApplication(key types.NamespacedName) (*v1beta2.Spa // submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit. func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (submitErr error) { logger.Info("Submitting SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) + // SubmissionID must be set before creating any resources to ensure all the resources are labeled. app.Status.SubmissionID = uuid.New().String() + app.Status.DriverInfo.PodName = util.GetDriverPodName(app) app.Status.LastSubmissionAttemptTime = metav1.Now() app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1 @@ -736,8 +739,12 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (subm } } - driverPodName := util.GetDriverPodName(app) - app.Status.DriverInfo.PodName = driverPodName + defer func() { + if err := r.cleanUpPodTemplateFiles(app); err != nil { + logger.Error(fmt.Errorf("failed to clean up pod template files: %v", err), "name", app.Name, "namespace", app.Namespace) + } + }() + sparkSubmitArgs, err := buildSparkSubmitArgs(app) if err != nil { return fmt.Errorf("failed to build spark-submit arguments: %v", err) @@ -746,6 +753,7 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (subm // Try submitting the application by running spark-submit. logger.Info("Running spark-submit for SparkApplication", "name", app.Name, "namespace", app.Namespace, "arguments", sparkSubmitArgs) if err := runSparkSubmit(newSubmission(sparkSubmitArgs, app)); err != nil { + r.recordSparkApplicationEvent(app) return fmt.Errorf("failed to run spark-submit: %v", err) } return nil @@ -1224,3 +1232,18 @@ func (r *Reconciler) cleanUpOnTermination(_, newApp *v1beta2.SparkApplication) e } return nil } + +// cleanUpPodTemplateFiles cleans up the driver and executor pod template files. +func (r *Reconciler) cleanUpPodTemplateFiles(app *v1beta2.SparkApplication) error { + if app.Spec.Driver.Template == nil && app.Spec.Executor.Template == nil { + return nil + } + path := fmt.Sprintf("/tmp/spark/%s", app.Status.SubmissionID) + if err := os.RemoveAll(path); err != nil { + if !os.IsNotExist(err) { + return err + } + } + logger.V(1).Info("Deleted pod template files", "path", path) + return nil +} diff --git a/internal/controller/sparkapplication/submission.go b/internal/controller/sparkapplication/submission.go index 66e4a0be83..d0bb6f5781 100644 --- a/internal/controller/sparkapplication/submission.go +++ b/internal/controller/sparkapplication/submission.go @@ -83,15 +83,17 @@ func buildSparkSubmitArgs(app *v1beta2.SparkApplication) ([]string, error) { submissionWaitAppCompletionOption, sparkConfOption, hadoopConfOption, + driverPodTemplateOption, driverPodNameOption, driverConfOption, - driverSecretOption, driverEnvOption, + driverSecretOption, driverVolumeMountsOption, + executorPodTemplateOption, executorConfOption, + executorEnvOption, executorSecretOption, executorVolumeMountsOption, - executorEnvOption, nodeSelectorOption, dynamicAllocationOption, proxyUserOption, @@ -303,6 +305,12 @@ func driverConfOption(app *v1beta2.SparkApplication) ([]string, error) { property = fmt.Sprintf(common.SparkKubernetesDriverLabelTemplate, common.LabelLaunchedBySparkOperator) args = append(args, "--conf", fmt.Sprintf("%s=%s", property, "true")) + // If Spark version is less than 3.0.0 or driver pod template is not defined, then the driver pod needs to be mutated by the webhook. + if util.CompareSemanticVersion(app.Spec.SparkVersion, "3.0.0") < 0 || app.Spec.Driver.Template == nil { + property = fmt.Sprintf(common.SparkKubernetesDriverLabelTemplate, common.LabelMutatedBySparkOperator) + args = append(args, "--conf", fmt.Sprintf("%s=%s", property, "true")) + } + property = fmt.Sprintf(common.SparkKubernetesDriverLabelTemplate, common.LabelSubmissionID) args = append(args, "--conf", fmt.Sprintf("%s=%s", property, app.Status.SubmissionID)) @@ -646,6 +654,12 @@ func executorConfOption(app *v1beta2.SparkApplication) ([]string, error) { property = fmt.Sprintf(common.SparkKubernetesExecutorLabelTemplate, common.LabelLaunchedBySparkOperator) args = append(args, "--conf", fmt.Sprintf("%s=%s", property, "true")) + // If Spark version is less than 3.0.0 or executor pod template is not defined, then the executor pods need to be mutated by the webhook. + if util.CompareSemanticVersion(app.Spec.SparkVersion, "3.0.0") < 0 || app.Spec.Executor.Template == nil { + property = fmt.Sprintf(common.SparkKubernetesExecutorLabelTemplate, common.LabelMutatedBySparkOperator) + args = append(args, "--conf", fmt.Sprintf("%s=%s", property, "true")) + } + property = fmt.Sprintf(common.SparkKubernetesExecutorLabelTemplate, common.LabelSubmissionID) args = append(args, "--conf", fmt.Sprintf("%s=%s", property, app.Status.SubmissionID)) @@ -1022,3 +1036,45 @@ func mainApplicationFileOption(app *v1beta2.SparkApplication) ([]string, error) func applicationOption(app *v1beta2.SparkApplication) ([]string, error) { return app.Spec.Arguments, nil } + +// driverPodTemplateOption returns the driver pod template arguments. +func driverPodTemplateOption(app *v1beta2.SparkApplication) ([]string, error) { + if app.Spec.Driver.Template == nil { + return []string{}, nil + } + + podTemplateFile := fmt.Sprintf("/tmp/spark/%s/driver-pod-template.yaml", app.Status.SubmissionID) + if err := util.WriteObjectToFile(app.Spec.Driver.Template, podTemplateFile); err != nil { + return []string{}, err + } + logger.V(1).Info("Created driver pod template file for SparkApplication", "name", app.Name, "namespace", app.Namespace, "file", podTemplateFile) + + args := []string{ + "--conf", + fmt.Sprintf("%s=%s", common.SparkKubernetesDriverPodTemplateFile, podTemplateFile), + "--conf", + fmt.Sprintf("%s=%s", common.SparkKubernetesDriverPodTemplateContainerName, common.SparkDriverContainerName), + } + return args, nil +} + +// executorPodTemplateOption returns the executor pod template arguments. +func executorPodTemplateOption(app *v1beta2.SparkApplication) ([]string, error) { + if app.Spec.Executor.Template == nil { + return []string{}, nil + } + + podTemplateFile := fmt.Sprintf("/tmp/spark/%s/executor-pod-template.yaml", app.Status.SubmissionID) + if err := util.WriteObjectToFile(app.Spec.Executor.Template, podTemplateFile); err != nil { + return []string{}, err + } + logger.V(1).Info("Created executor pod template file for SparkApplication", "name", app.Name, "namespace", app.Namespace, "file", podTemplateFile) + + args := []string{ + "--conf", + fmt.Sprintf("%s=%s", common.SparkKubernetesExecutorPodTemplateFile, podTemplateFile), + "--conf", + fmt.Sprintf("%s=%s", common.SparkKubernetesExecutorPodTemplateContainerName, common.Spark3DefaultExecutorContainerName), + } + return args, nil +} diff --git a/internal/webhook/sparkapplication_defaulter.go b/internal/webhook/sparkapplication_defaulter.go index 661ecf708a..9c10ea10c2 100644 --- a/internal/webhook/sparkapplication_defaulter.go +++ b/internal/webhook/sparkapplication_defaulter.go @@ -83,32 +83,9 @@ func defaultSparkApplication(app *v1beta2.SparkApplication) { } func defaultDriverSpec(app *v1beta2.SparkApplication) { - if app.Spec.Driver.Cores == nil { - if app.Spec.SparkConf == nil || app.Spec.SparkConf[common.SparkDriverCores] == "" { - app.Spec.Driver.Cores = util.Int32Ptr(1) - } - } - - if app.Spec.Driver.Memory == nil { - if app.Spec.SparkConf == nil || app.Spec.SparkConf[common.SparkDriverMemory] == "" { - app.Spec.Driver.Memory = util.StringPtr("1g") - } - } } func defaultExecutorSpec(app *v1beta2.SparkApplication) { - if app.Spec.Executor.Cores == nil { - if app.Spec.SparkConf == nil || app.Spec.SparkConf[common.SparkExecutorCores] == "" { - app.Spec.Executor.Cores = util.Int32Ptr(1) - } - } - - if app.Spec.Executor.Memory == nil { - if app.Spec.SparkConf == nil || app.Spec.SparkConf[common.SparkExecutorMemory] == "" { - app.Spec.Executor.Memory = util.StringPtr("1g") - } - } - if app.Spec.Executor.Instances == nil { // Check whether dynamic allocation is enabled in application spec. enableDynamicAllocation := app.Spec.DynamicAllocation != nil && app.Spec.DynamicAllocation.Enabled diff --git a/internal/webhook/sparkapplication_validator.go b/internal/webhook/sparkapplication_validator.go index 7b1fd41085..e1dd4f6f6a 100644 --- a/internal/webhook/sparkapplication_validator.go +++ b/internal/webhook/sparkapplication_validator.go @@ -117,6 +117,10 @@ func (v *SparkApplicationValidator) ValidateDelete(ctx context.Context, obj runt func (v *SparkApplicationValidator) validateSpec(_ context.Context, app *v1beta2.SparkApplication) error { logger.V(1).Info("Validating SparkApplication spec", "name", app.Name, "namespace", app.Namespace, "state", util.GetApplicationState(app)) + if err := v.validateSparkVersion(app); err != nil { + return err + } + if app.Spec.NodeSelector != nil && (app.Spec.Driver.NodeSelector != nil || app.Spec.Executor.NodeSelector != nil) { return fmt.Errorf("node selector cannot be defined at both SparkApplication and Driver/Executor") } @@ -144,6 +148,16 @@ func (v *SparkApplicationValidator) validateSpec(_ context.Context, app *v1beta2 return nil } +func (v *SparkApplicationValidator) validateSparkVersion(app *v1beta2.SparkApplication) error { + // The pod template feature requires Spark version 3.0.0 or higher. + if app.Spec.Driver.Template != nil || app.Spec.Executor.Template != nil { + if util.CompareSemanticVersion(app.Spec.SparkVersion, "3.0.0") < 0 { + return fmt.Errorf("pod template feature requires Spark version 3.0.0 or higher") + } + } + return nil +} + func (v *SparkApplicationValidator) validateResourceUsage(ctx context.Context, app *v1beta2.SparkApplication) error { logger.V(1).Info("Validating SparkApplication resource usage", "name", app.Name, "namespace", app.Namespace, "state", util.GetApplicationState(app)) diff --git a/pkg/common/spark.go b/pkg/common/spark.go index 24ea5ff315..94ae2c51da 100644 --- a/pkg/common/spark.go +++ b/pkg/common/spark.go @@ -307,6 +307,9 @@ const ( // LabelLaunchedBySparkOperator is a label on Spark pods launched through the Spark Operator. LabelLaunchedBySparkOperator = LabelAnnotationPrefix + "launched-by-spark-operator" + // LabelMutatedBySparkOperator is a label on Spark pods that need to be mutated by webhook. + LabelMutatedBySparkOperator = LabelAnnotationPrefix + "mutated-by-spark-operator" + // LabelSubmissionID is the label that records the submission ID of the current run of an application. LabelSubmissionID = LabelAnnotationPrefix + "submission-id" diff --git a/pkg/util/util.go b/pkg/util/util.go index 850bc209d0..25f664dbca 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -19,8 +19,12 @@ package util import ( "fmt" "os" + "path/filepath" "strings" + "golang.org/x/mod/semver" + "sigs.k8s.io/yaml" + "github.com/kubeflow/spark-operator/pkg/common" ) @@ -77,3 +81,40 @@ func Int64Ptr(n int64) *int64 { func StringPtr(s string) *string { return &s } + +// CompareSemanticVersion compares two semantic versions. +func CompareSemanticVersion(v1, v2 string) int { + // Add 'v' prefix if needed + addPrefix := func(s string) string { + if !strings.HasPrefix(s, "v") { + return "v" + s + } + return s + } + return semver.Compare(addPrefix(v1), addPrefix(v2)) +} + +// WriteObjectToFile marshals the given object into a YAML document and writes it to the given file. +func WriteObjectToFile(obj interface{}, filePath string) error { + if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil { + return err + } + + file, err := os.Create(filePath) + if err != nil { + return err + } + defer file.Close() + + data, err := yaml.Marshal(obj) + if err != nil { + return err + } + + _, err = file.Write(data) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 324ed3580f..5f24d4a372 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -21,6 +21,8 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/kubeflow/spark-operator/pkg/common" "github.com/kubeflow/spark-operator/pkg/util" @@ -129,3 +131,73 @@ var _ = Describe("StringPtr", func() { Expect(util.StringPtr(s)).To(Equal(&s)) }) }) + +var _ = Describe("CompareSemanticVersions", func() { + It("Should return 0 if the two versions are equal", func() { + Expect(util.CompareSemanticVersion("1.2.3", "1.2.3")) + Expect(util.CompareSemanticVersion("1.2.3", "v1.2.3")).To(Equal(0)) + }) + + It("Should return -1 if the first version is less than the second version", func() { + Expect(util.CompareSemanticVersion("2.3.4", "2.4.5")).To(Equal(-1)) + Expect(util.CompareSemanticVersion("2.4.5", "2.4.8")).To(Equal(-1)) + Expect(util.CompareSemanticVersion("2.4.8", "3.5.2")).To(Equal(-1)) + }) + + It("Should return +1 if the first version is greater than the second version", func() { + Expect(util.CompareSemanticVersion("2.4.5", "2.3.4")).To(Equal(1)) + Expect(util.CompareSemanticVersion("2.4.8", "2.4.5")).To(Equal(1)) + Expect(util.CompareSemanticVersion("3.5.2", "2.4.8")).To(Equal(1)) + }) +}) + +var _ = Describe("WriteObjectToFile", func() { + It("Should write the object to the file", func() { + podTemplate := &corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Labels: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + Annotations: map[string]string{ + "key3": "value3", + "key4": "value4", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "test-image", + }, + }, + }, + } + + expected := `metadata: + annotations: + key3: value3 + key4: value4 + creationTimestamp: null + labels: + key1: value1 + key2: value2 + name: test-pod +spec: + containers: + - image: test-image + name: test-container + resources: {} +` + file := "pod-template.yaml" + Expect(util.WriteObjectToFile(podTemplate, file)).To(Succeed()) + + data, err := os.ReadFile(file) + Expect(err).NotTo(HaveOccurred()) + actual := string(data) + + Expect(actual).To(Equal(expected)) + Expect(os.Remove(file)).NotTo(HaveOccurred()) + }) +})