diff --git a/go/tasks/plugins/k8s/spark/spark.go b/go/tasks/plugins/k8s/spark/spark.go index bd03ec6ce..358c9f11e 100755 --- a/go/tasks/plugins/k8s/spark/spark.go +++ b/go/tasks/plugins/k8s/spark/spark.go @@ -104,6 +104,7 @@ func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCo } driverSpec := sparkOp.DriverSpec{ SparkPodSpec: sparkOp.SparkPodSpec{ + Affinity: config.GetK8sPluginConfig().DefaultAffinity, Annotations: annotations, Labels: labels, EnvVars: sparkEnvVars, @@ -120,6 +121,7 @@ func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCo executorSpec := sparkOp.ExecutorSpec{ SparkPodSpec: sparkOp.SparkPodSpec{ + Affinity: config.GetK8sPluginConfig().DefaultAffinity, Annotations: annotations, Labels: labels, Image: &container.Image, diff --git a/go/tasks/plugins/k8s/spark/spark_test.go b/go/tasks/plugins/k8s/spark/spark_test.go index f7aa77ff0..4c61864ee 100755 --- a/go/tasks/plugins/k8s/spark/spark_test.go +++ b/go/tasks/plugins/k8s/spark/spark_test.go @@ -373,7 +373,27 @@ func TestBuildResourceSpark(t *testing.T) { defaultEnvVarsFromEnv := make(map[string]string) defaultEnvVarsFromEnv["fooEnv"] = "barEnv" + // Default affinity/anti-affinity + defaultAffinity := &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "x/default", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + } + assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ + DefaultAffinity: defaultAffinity, DefaultPodSecurityContext: &corev1.PodSecurityContext{ RunAsUser: &runAsUser, }, @@ -538,6 +558,8 @@ func TestBuildResourceSpark(t *testing.T) { assert.Equal(t, sparkApp.Spec.Executor.EnvVars["foo"], defaultEnvVars["foo"]) assert.Equal(t, sparkApp.Spec.Driver.EnvVars["fooEnv"], defaultEnvVarsFromEnv["fooEnv"]) assert.Equal(t, sparkApp.Spec.Executor.EnvVars["fooEnv"], defaultEnvVarsFromEnv["fooEnv"]) + assert.Equal(t, sparkApp.Spec.Driver.Affinity, defaultAffinity) + assert.Equal(t, sparkApp.Spec.Executor.Affinity, defaultAffinity) // Case 2: Driver/Executor request cores set. dummyConfWithRequest := make(map[string]string)