Skip to content

Commit

Permalink
Add RayJob submitter job into PodSets (#3729)
Browse files Browse the repository at this point in the history
* Add RayJob submitter job into PodSets

Signed-off-by: Andrew Sy Kim <[email protected]>

* resolve failures in unit and integration tests

Signed-off-by: Andrew Sy Kim <[email protected]>

* add getSubmitterTemplate function

Signed-off-by: Andrew Sy Kim <[email protected]>

---------

Signed-off-by: Andrew Sy Kim <[email protected]>
  • Loading branch information
andrewsykim authored Dec 13, 2024
1 parent 1e1445a commit 8f51174
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 12 deletions.
89 changes: 81 additions & 8 deletions pkg/controller/jobs/rayjob/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
rayutils "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -39,8 +41,9 @@ var (
)

const (
headGroupPodSetName = "head"
FrameworkName = "ray.io/rayjob"
headGroupPodSetName = "head"
submitterJobPodSetName = "submitter"
FrameworkName = "ray.io/rayjob"
)

func init() {
Expand Down Expand Up @@ -103,16 +106,15 @@ func (j *RayJob) PodLabelSelector() string {
}

func (j *RayJob) PodSets() []kueue.PodSet {
// len = workerGroups + head
podSets := make([]kueue.PodSet, len(j.Spec.RayClusterSpec.WorkerGroupSpecs)+1)
podSets := make([]kueue.PodSet, 0)

// head
podSets[0] = kueue.PodSet{
podSets = append(podSets, kueue.PodSet{
Name: headGroupPodSetName,
Template: *j.Spec.RayClusterSpec.HeadGroupSpec.Template.DeepCopy(),
Count: 1,
TopologyRequest: jobframework.PodSetTopologyRequest(&j.Spec.RayClusterSpec.HeadGroupSpec.Template.ObjectMeta, nil, nil, nil),
}
})

// workers
for index := range j.Spec.RayClusterSpec.WorkerGroupSpecs {
Expand All @@ -124,18 +126,34 @@ func (j *RayJob) PodSets() []kueue.PodSet {
if wgs.NumOfHosts > 1 {
count *= wgs.NumOfHosts
}
podSets[index+1] = kueue.PodSet{
podSets = append(podSets, kueue.PodSet{
Name: strings.ToLower(wgs.GroupName),
Template: *wgs.Template.DeepCopy(),
Count: count,
TopologyRequest: jobframework.PodSetTopologyRequest(&wgs.Template.ObjectMeta, nil, nil, nil),
})
}

// submitter Job
if j.Spec.SubmissionMode == rayv1.K8sJobMode {
submitterJobPodSet := kueue.PodSet{
Name: submitterJobPodSetName,
Count: 1,
}

submitterJobPodSet.Template = *getSubmitterTemplate(j)
podSets = append(podSets, submitterJobPodSet)
}

return podSets
}

func (j *RayJob) RunWithPodSetsInfo(podSetsInfo []podset.PodSetInfo) error {
expectedLen := len(j.Spec.RayClusterSpec.WorkerGroupSpecs) + 1
if j.Spec.SubmissionMode == rayv1.K8sJobMode {
expectedLen++
}

if len(podSetsInfo) != expectedLen {
return podset.BadPodSetsInfoLenError(expectedLen, len(podSetsInfo))
}
Expand All @@ -157,11 +175,26 @@ func (j *RayJob) RunWithPodSetsInfo(podSetsInfo []podset.PodSetInfo) error {
return err
}
}

// submitter
if j.Spec.SubmissionMode == rayv1.K8sJobMode {
submitterPod := getSubmitterTemplate(j)
info := podSetsInfo[expectedLen-1]
if err := podset.Merge(&submitterPod.ObjectMeta, &submitterPod.Spec, info); err != nil {
return err
}
}

return nil
}

func (j *RayJob) RestorePodSetsInfo(podSetsInfo []podset.PodSetInfo) bool {
if len(podSetsInfo) != len(j.Spec.RayClusterSpec.WorkerGroupSpecs)+1 {
expectedLen := len(j.Spec.RayClusterSpec.WorkerGroupSpecs) + 1
if j.Spec.SubmissionMode == rayv1.K8sJobMode {
expectedLen++
}

if len(podSetsInfo) != expectedLen {
return false
}

Expand All @@ -175,6 +208,14 @@ func (j *RayJob) RestorePodSetsInfo(podSetsInfo []podset.PodSetInfo) bool {
info := podSetsInfo[index+1]
changed = podset.RestorePodSpec(&workerPod.ObjectMeta, &workerPod.Spec, info) || changed
}

// submitter
if j.Spec.SubmissionMode == rayv1.K8sJobMode {
submitterPod := getSubmitterTemplate(j)
info := podSetsInfo[expectedLen-1]
changed = podset.RestorePodSpec(&submitterPod.ObjectMeta, &submitterPod.Spec, info) || changed
}

return changed
}

Expand All @@ -200,3 +241,35 @@ func GetWorkloadNameForRayJob(jobName string, jobUID types.UID) string {
func isRayJob(owner *metav1.OwnerReference) bool {
return owner.Kind == "RayJob" && strings.HasPrefix(owner.APIVersion, "ray.io/v1")
}

// getSubmitterTemplate returns the PodTemplteSpec of the submitter Job used for RayJob when submissionMode=K8sJobMode
func getSubmitterTemplate(rayJob *RayJob) *corev1.PodTemplateSpec {
if rayJob.Spec.SubmitterPodTemplate != nil {
return rayJob.Spec.SubmitterPodTemplate
}

// The default submitter Job pod template is copied from
// https://github.com/ray-project/kuberay/blob/86506d6b88a6428fc66048c276d7d93b39df7489/ray-operator/controllers/ray/common/job.go#L122-L146
return &corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-job-submitter",
// Use the image of the Ray head to be defensive against version mismatch issues
Image: rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Image,
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("200Mi"),
},
},
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
}
}
134 changes: 134 additions & 0 deletions pkg/controller/jobs/rayjob/rayjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"

Expand Down Expand Up @@ -243,6 +244,139 @@ func TestPodSets(t *testing.T) {
}
},
},
"with default job submitter": {
rayJob: (*RayJob)(testingrayutil.MakeJob("rayjob", "ns").
WithSubmissionMode(rayv1.K8sJobMode).
WithHeadGroupSpec(
rayv1.HeadGroupSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "head_c"}}},
},
},
).
WithWorkerGroups(
rayv1.WorkerGroupSpec{
GroupName: "group1",
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "group1_c"}}},
},
},
rayv1.WorkerGroupSpec{
GroupName: "group2",
Replicas: ptr.To[int32](3),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "group2_c"}}},
},
},
).
Obj()),
wantPodSets: func(rayJob *RayJob) []kueue.PodSet {
return []kueue.PodSet{
{
Name: headGroupPodSetName,
Count: 1,
Template: *rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.DeepCopy(),
},
{
Name: "group1",
Count: 1,
Template: *rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.DeepCopy(),
},
{
Name: "group2",
Count: 3,
Template: *rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[1].Template.DeepCopy(),
},
{
Name: "submitter",
Count: 1,
Template: *getSubmitterTemplate(rayJob),
},
}
},
},
"with submitter job pod template override": {
rayJob: (*RayJob)(testingrayutil.MakeJob("rayjob", "ns").
WithSubmissionMode(rayv1.K8sJobMode).
WithHeadGroupSpec(
rayv1.HeadGroupSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "head_c"}}},
},
},
).
WithWorkerGroups(
rayv1.WorkerGroupSpec{
GroupName: "group1",
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "group1_c"}}},
},
},
rayv1.WorkerGroupSpec{
GroupName: "group2",
Replicas: ptr.To[int32](3),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "group2_c"}}},
},
},
).
WithSubmitterPodTemplate(corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-job-submitter",
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("50m"),
corev1.ResourceMemory: resource.MustParse("100Mi"),
},
},
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
}).
Obj()),
wantPodSets: func(rayJob *RayJob) []kueue.PodSet {
return []kueue.PodSet{
{
Name: headGroupPodSetName,
Count: 1,
Template: *rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.DeepCopy(),
},
{
Name: "group1",
Count: 1,
Template: *rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.DeepCopy(),
},
{
Name: "group2",
Count: 3,
Template: *rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[1].Template.DeepCopy(),
},
{
Name: "submitter",
Count: 1,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-job-submitter",
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("50m"),
corev1.ResourceMemory: resource.MustParse("100Mi"),
},
},
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
},
},
}
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/util/testingjobs/rayjob/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@ func (j *JobWrapper) WithWorkerGroups(workers ...rayv1.WorkerGroupSpec) *JobWrap
return j
}

func (j *JobWrapper) WithSubmitterPodTemplate(template corev1.PodTemplateSpec) *JobWrapper {
j.Spec.SubmitterPodTemplate = &template
return j
}

func (j *JobWrapper) WithSubmissionMode(submissionMode rayv1.JobSubmissionMode) *JobWrapper {
j.Spec.SubmissionMode = submissionMode
return j
}

func (j *JobWrapper) WithHeadGroupSpec(value rayv1.HeadGroupSpec) *JobWrapper {
j.Spec.RayClusterSpec.HeadGroupSpec = value
return j
Expand Down
18 changes: 14 additions & 4 deletions test/integration/controller/jobs/rayjob/rayjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "spot",
},
}, kueue.PodSetAssignment{
Name: createdWorkload.Spec.PodSets[2].Name,
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "on-demand",
},
},
).Obj()
gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed())
Expand Down Expand Up @@ -366,7 +371,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O
ginkgo.DescribeTable("Single job at different stages of progress towards completion",
func(podsReadyTestSpec podsReadyTestSpec) {
ginkgo.By("Create a job")
job := testingrayjob.MakeJob(jobName, ns.Name).Obj()
job := testingrayjob.MakeJob(jobName, ns.Name).WithSubmissionMode(rayv1.K8sJobMode).Obj()
jobQueueName := "test-queue"
job.Annotations = map[string]string{constants.QueueAnnotation: jobQueueName}
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
Expand Down Expand Up @@ -394,6 +399,11 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "default",
},
}, kueue.PodSetAssignment{
Name: createdWorkload.Spec.PodSets[2].Name,
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "default",
},
},
).Obj()
gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed())
Expand Down Expand Up @@ -548,8 +558,8 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde

clusterQueue = testing.MakeClusterQueue("dev-clusterqueue").
ResourceGroup(
*testing.MakeFlavorQuotas("spot-untainted").Resource(corev1.ResourceCPU, "5").Obj(),
*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Obj(),
*testing.MakeFlavorQuotas("spot-untainted").Resource(corev1.ResourceCPU, "5").Resource(corev1.ResourceMemory, "1Gi").Obj(),
*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Resource(corev1.ResourceMemory, "1Gi").Obj(),
).Obj()
gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed())
})
Expand Down Expand Up @@ -614,7 +624,7 @@ var _ = ginkgo.Describe("Job controller with preemption enabled", ginkgo.Ordered

clusterQueue = testing.MakeClusterQueue("clusterqueue").
ResourceGroup(
*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "4").Obj(),
*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Resource(corev1.ResourceMemory, "4Gi").Obj(),
).
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
Expand Down
6 changes: 6 additions & 0 deletions test/integration/webhook/jobs/raycluster_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ var _ = ginkgo.Describe("RayCluster Webhook", func() {
corev1.ResourceCPU: "default",
},
},
kueue.PodSetAssignment{
Name: createdWorkload.Spec.PodSets[2].Name,
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "default",
},
},
).Obj()
gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).To(gomega.Succeed())
util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload)
Expand Down

0 comments on commit 8f51174

Please sign in to comment.