From 8f51174d9d018f85fc09c3ab72769f5a7e7ff097 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Fri, 13 Dec 2024 13:00:26 -0500 Subject: [PATCH] Add RayJob submitter job into PodSets (#3729) * Add RayJob submitter job into PodSets Signed-off-by: Andrew Sy Kim * resolve failures in unit and integration tests Signed-off-by: Andrew Sy Kim * add getSubmitterTemplate function Signed-off-by: Andrew Sy Kim --------- Signed-off-by: Andrew Sy Kim --- .../jobs/rayjob/rayjob_controller.go | 89 ++++++++++-- .../jobs/rayjob/rayjob_controller_test.go | 134 ++++++++++++++++++ pkg/util/testingjobs/rayjob/wrappers.go | 10 ++ .../jobs/rayjob/rayjob_controller_test.go | 18 ++- .../webhook/jobs/raycluster_webhook_test.go | 6 + 5 files changed, 245 insertions(+), 12 deletions(-) diff --git a/pkg/controller/jobs/rayjob/rayjob_controller.go b/pkg/controller/jobs/rayjob/rayjob_controller.go index fd6e3f5c10..16488da894 100644 --- a/pkg/controller/jobs/rayjob/rayjob_controller.go +++ b/pkg/controller/jobs/rayjob/rayjob_controller.go @@ -23,6 +23,8 @@ import ( rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" rayutils "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -39,8 +41,9 @@ var ( ) const ( - headGroupPodSetName = "head" - FrameworkName = "ray.io/rayjob" + headGroupPodSetName = "head" + submitterJobPodSetName = "submitter" + FrameworkName = "ray.io/rayjob" ) func init() { @@ -103,16 +106,15 @@ func (j *RayJob) PodLabelSelector() string { } func (j *RayJob) PodSets() []kueue.PodSet { - // len = workerGroups + head - podSets := make([]kueue.PodSet, len(j.Spec.RayClusterSpec.WorkerGroupSpecs)+1) + podSets := make([]kueue.PodSet, 0) // head - podSets[0] = kueue.PodSet{ + podSets = append(podSets, kueue.PodSet{ Name: headGroupPodSetName, Template: *j.Spec.RayClusterSpec.HeadGroupSpec.Template.DeepCopy(), Count: 1, TopologyRequest: jobframework.PodSetTopologyRequest(&j.Spec.RayClusterSpec.HeadGroupSpec.Template.ObjectMeta, nil, nil, nil), - } + }) // workers for index := range j.Spec.RayClusterSpec.WorkerGroupSpecs { @@ -124,18 +126,34 @@ func (j *RayJob) PodSets() []kueue.PodSet { if wgs.NumOfHosts > 1 { count *= wgs.NumOfHosts } - podSets[index+1] = kueue.PodSet{ + podSets = append(podSets, kueue.PodSet{ Name: strings.ToLower(wgs.GroupName), Template: *wgs.Template.DeepCopy(), Count: count, TopologyRequest: jobframework.PodSetTopologyRequest(&wgs.Template.ObjectMeta, nil, nil, nil), + }) + } + + // submitter Job + if j.Spec.SubmissionMode == rayv1.K8sJobMode { + submitterJobPodSet := kueue.PodSet{ + Name: submitterJobPodSetName, + Count: 1, } + + submitterJobPodSet.Template = *getSubmitterTemplate(j) + podSets = append(podSets, submitterJobPodSet) } + return podSets } func (j *RayJob) RunWithPodSetsInfo(podSetsInfo []podset.PodSetInfo) error { expectedLen := len(j.Spec.RayClusterSpec.WorkerGroupSpecs) + 1 + if j.Spec.SubmissionMode == rayv1.K8sJobMode { + expectedLen++ + } + if len(podSetsInfo) != expectedLen { return podset.BadPodSetsInfoLenError(expectedLen, len(podSetsInfo)) } @@ -157,11 +175,26 @@ func (j *RayJob) RunWithPodSetsInfo(podSetsInfo []podset.PodSetInfo) error { return err } } + + // submitter + if j.Spec.SubmissionMode == rayv1.K8sJobMode { + submitterPod := getSubmitterTemplate(j) + info := podSetsInfo[expectedLen-1] + if err := podset.Merge(&submitterPod.ObjectMeta, &submitterPod.Spec, info); err != nil { + return err + } + } + return nil } func (j *RayJob) RestorePodSetsInfo(podSetsInfo []podset.PodSetInfo) bool { - if len(podSetsInfo) != len(j.Spec.RayClusterSpec.WorkerGroupSpecs)+1 { + expectedLen := len(j.Spec.RayClusterSpec.WorkerGroupSpecs) + 1 + if j.Spec.SubmissionMode == rayv1.K8sJobMode { + expectedLen++ + } + + if len(podSetsInfo) != expectedLen { return false } @@ -175,6 +208,14 @@ func (j *RayJob) RestorePodSetsInfo(podSetsInfo []podset.PodSetInfo) bool { info := podSetsInfo[index+1] changed = podset.RestorePodSpec(&workerPod.ObjectMeta, &workerPod.Spec, info) || changed } + + // submitter + if j.Spec.SubmissionMode == rayv1.K8sJobMode { + submitterPod := getSubmitterTemplate(j) + info := podSetsInfo[expectedLen-1] + changed = podset.RestorePodSpec(&submitterPod.ObjectMeta, &submitterPod.Spec, info) || changed + } + return changed } @@ -200,3 +241,35 @@ func GetWorkloadNameForRayJob(jobName string, jobUID types.UID) string { func isRayJob(owner *metav1.OwnerReference) bool { return owner.Kind == "RayJob" && strings.HasPrefix(owner.APIVersion, "ray.io/v1") } + +// getSubmitterTemplate returns the PodTemplteSpec of the submitter Job used for RayJob when submissionMode=K8sJobMode +func getSubmitterTemplate(rayJob *RayJob) *corev1.PodTemplateSpec { + if rayJob.Spec.SubmitterPodTemplate != nil { + return rayJob.Spec.SubmitterPodTemplate + } + + // The default submitter Job pod template is copied from + // https://github.com/ray-project/kuberay/blob/86506d6b88a6428fc66048c276d7d93b39df7489/ray-operator/controllers/ray/common/job.go#L122-L146 + return &corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "ray-job-submitter", + // Use the image of the Ray head to be defensive against version mismatch issues + Image: rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Image, + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("200Mi"), + }, + }, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + } +} diff --git a/pkg/controller/jobs/rayjob/rayjob_controller_test.go b/pkg/controller/jobs/rayjob/rayjob_controller_test.go index 7622e7792b..00b65bc6c8 100644 --- a/pkg/controller/jobs/rayjob/rayjob_controller_test.go +++ b/pkg/controller/jobs/rayjob/rayjob_controller_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" @@ -243,6 +244,139 @@ func TestPodSets(t *testing.T) { } }, }, + "with default job submitter": { + rayJob: (*RayJob)(testingrayutil.MakeJob("rayjob", "ns"). + WithSubmissionMode(rayv1.K8sJobMode). + WithHeadGroupSpec( + rayv1.HeadGroupSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "head_c"}}}, + }, + }, + ). + WithWorkerGroups( + rayv1.WorkerGroupSpec{ + GroupName: "group1", + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "group1_c"}}}, + }, + }, + rayv1.WorkerGroupSpec{ + GroupName: "group2", + Replicas: ptr.To[int32](3), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "group2_c"}}}, + }, + }, + ). + Obj()), + wantPodSets: func(rayJob *RayJob) []kueue.PodSet { + return []kueue.PodSet{ + { + Name: headGroupPodSetName, + Count: 1, + Template: *rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.DeepCopy(), + }, + { + Name: "group1", + Count: 1, + Template: *rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.DeepCopy(), + }, + { + Name: "group2", + Count: 3, + Template: *rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[1].Template.DeepCopy(), + }, + { + Name: "submitter", + Count: 1, + Template: *getSubmitterTemplate(rayJob), + }, + } + }, + }, + "with submitter job pod template override": { + rayJob: (*RayJob)(testingrayutil.MakeJob("rayjob", "ns"). + WithSubmissionMode(rayv1.K8sJobMode). + WithHeadGroupSpec( + rayv1.HeadGroupSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "head_c"}}}, + }, + }, + ). + WithWorkerGroups( + rayv1.WorkerGroupSpec{ + GroupName: "group1", + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "group1_c"}}}, + }, + }, + rayv1.WorkerGroupSpec{ + GroupName: "group2", + Replicas: ptr.To[int32](3), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "group2_c"}}}, + }, + }, + ). + WithSubmitterPodTemplate(corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "ray-job-submitter", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("50m"), + corev1.ResourceMemory: resource.MustParse("100Mi"), + }, + }, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + }). + Obj()), + wantPodSets: func(rayJob *RayJob) []kueue.PodSet { + return []kueue.PodSet{ + { + Name: headGroupPodSetName, + Count: 1, + Template: *rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.DeepCopy(), + }, + { + Name: "group1", + Count: 1, + Template: *rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.DeepCopy(), + }, + { + Name: "group2", + Count: 3, + Template: *rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[1].Template.DeepCopy(), + }, + { + Name: "submitter", + Count: 1, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "ray-job-submitter", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("50m"), + corev1.ResourceMemory: resource.MustParse("100Mi"), + }, + }, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + } + }, + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { diff --git a/pkg/util/testingjobs/rayjob/wrappers.go b/pkg/util/testingjobs/rayjob/wrappers.go index 2efb403190..4718f7cfae 100644 --- a/pkg/util/testingjobs/rayjob/wrappers.go +++ b/pkg/util/testingjobs/rayjob/wrappers.go @@ -136,6 +136,16 @@ func (j *JobWrapper) WithWorkerGroups(workers ...rayv1.WorkerGroupSpec) *JobWrap return j } +func (j *JobWrapper) WithSubmitterPodTemplate(template corev1.PodTemplateSpec) *JobWrapper { + j.Spec.SubmitterPodTemplate = &template + return j +} + +func (j *JobWrapper) WithSubmissionMode(submissionMode rayv1.JobSubmissionMode) *JobWrapper { + j.Spec.SubmissionMode = submissionMode + return j +} + func (j *JobWrapper) WithHeadGroupSpec(value rayv1.HeadGroupSpec) *JobWrapper { j.Spec.RayClusterSpec.HeadGroupSpec = value return j diff --git a/test/integration/controller/jobs/rayjob/rayjob_controller_test.go b/test/integration/controller/jobs/rayjob/rayjob_controller_test.go index 7ec79f402d..ac7c2405f5 100644 --- a/test/integration/controller/jobs/rayjob/rayjob_controller_test.go +++ b/test/integration/controller/jobs/rayjob/rayjob_controller_test.go @@ -181,6 +181,11 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ corev1.ResourceCPU: "spot", }, + }, kueue.PodSetAssignment{ + Name: createdWorkload.Spec.PodSets[2].Name, + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "on-demand", + }, }, ).Obj() gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed()) @@ -366,7 +371,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O ginkgo.DescribeTable("Single job at different stages of progress towards completion", func(podsReadyTestSpec podsReadyTestSpec) { ginkgo.By("Create a job") - job := testingrayjob.MakeJob(jobName, ns.Name).Obj() + job := testingrayjob.MakeJob(jobName, ns.Name).WithSubmissionMode(rayv1.K8sJobMode).Obj() jobQueueName := "test-queue" job.Annotations = map[string]string{constants.QueueAnnotation: jobQueueName} gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) @@ -394,6 +399,11 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ corev1.ResourceCPU: "default", }, + }, kueue.PodSetAssignment{ + Name: createdWorkload.Spec.PodSets[2].Name, + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "default", + }, }, ).Obj() gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed()) @@ -548,8 +558,8 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde clusterQueue = testing.MakeClusterQueue("dev-clusterqueue"). ResourceGroup( - *testing.MakeFlavorQuotas("spot-untainted").Resource(corev1.ResourceCPU, "5").Obj(), - *testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Obj(), + *testing.MakeFlavorQuotas("spot-untainted").Resource(corev1.ResourceCPU, "5").Resource(corev1.ResourceMemory, "1Gi").Obj(), + *testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Resource(corev1.ResourceMemory, "1Gi").Obj(), ).Obj() gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed()) }) @@ -614,7 +624,7 @@ var _ = ginkgo.Describe("Job controller with preemption enabled", ginkgo.Ordered clusterQueue = testing.MakeClusterQueue("clusterqueue"). ResourceGroup( - *testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "4").Obj(), + *testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Resource(corev1.ResourceMemory, "4Gi").Obj(), ). Preemption(kueue.ClusterQueuePreemption{ WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, diff --git a/test/integration/webhook/jobs/raycluster_webhook_test.go b/test/integration/webhook/jobs/raycluster_webhook_test.go index 6e59a6e018..20aa4ee35a 100644 --- a/test/integration/webhook/jobs/raycluster_webhook_test.go +++ b/test/integration/webhook/jobs/raycluster_webhook_test.go @@ -153,6 +153,12 @@ var _ = ginkgo.Describe("RayCluster Webhook", func() { corev1.ResourceCPU: "default", }, }, + kueue.PodSetAssignment{ + Name: createdWorkload.Spec.PodSets[2].Name, + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "default", + }, + }, ).Obj() gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).To(gomega.Succeed()) util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload)