-
Notifications
You must be signed in to change notification settings - Fork 446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] Support suspend in RayJob #926
Conversation
e4203ad
to
5f3f0a5
Compare
Thanks for the feature request. We will have a look and come back to you soon |
Hello @Jeffwan |
I quickly looked through https://github.com/ray-project/community and I didn't see any recurring meeting. Is there any venue where we can meet if you have questions about why this is important? @Jeffwan |
Thanks for bringing this up -- we do have a biweekly Kuberay sync, @gvspraveen can check on adding this to the linked community page. In this case, we discussed internally and don't have any questions, no need to present anything at the sync. Because |
That community page seems to be for all of ray-project. Not sure if adding meeting links is appropriate there. But there is a Slack group linked in ray docs. You can reach out to us there and we can add you to meeting. |
568a2b4
to
1ab1fdd
Compare
@architkulkarni Could you re-run the workflows? |
5bd476d
to
224e3ce
Compare
@@ -61,6 +62,8 @@ type RayJobSpec struct { | |||
RayClusterSpec *RayClusterSpec `json:"rayClusterSpec,omitempty"` | |||
// clusterSelector is used to select running rayclusters by labels | |||
ClusterSelector map[string]string `json:"clusterSelector,omitempty"` | |||
// suspend specifies whether the RayJob controller should create a RayCluster instance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth explaining what happens with the RayCluster on transitions from false to true and true to false.
Does it affect .status.startTime
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done .status.startTime is updating for each new start.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mcariatm Could you also reset .status.endTime
when a job is resumed (transitions from suspend true
-> false
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mcariatm Could you also reset
.status.endTime
when a job is resumed (transitions from suspendtrue
->false
)
Done, line 400 in rayjob_controller.go
@@ -272,6 +326,11 @@ func isJobPendingOrRunning(status rayv1alpha1.JobStatus) bool { | |||
return (status == rayv1alpha1.JobStatusPending) || (status == rayv1alpha1.JobStatusRunning) | |||
} | |||
|
|||
// isSuspendFlagSet indicates whether the job has a suspended flag set. | |||
func isSuspendFlagSet(job *rayv1alpha1.RayJob) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one-liner doesn't add value. I would just inline job.Spec.Suspend
where necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@architkulkarni, @Jeffwan I found an issue. When a rayjob is deleted and after a short time it is applied again we get this error.
This happens when the old RayCluster is still in terminating status and the job remains in FAILED status with this error as I noticed. If the jobs are orchestrated by Kueue this problem may occur more and more often. |
@mcariatm Currently, we use the old cluster id when the Ray cluster instance is re-created, so if the suspend Also, consider resetting |
What happens with the RayCluster object when you call If the object persists, I think it's more appropriate to wait for it to finish before creating the new RayCluster. If not, what @oginskis suggests sounds like a good option. |
After a long time of testing I understood that the problem does not depend on the clusters that are in terminating status. |
@@ -160,14 +160,14 @@ func FetchDashboardURL(ctx context.Context, log *logr.Logger, cli client.Client, | |||
|
|||
func (r *RayDashboardClient) InitClient(url string) { | |||
r.client = http.Client{ | |||
Timeout: 2 * time.Second, | |||
Timeout: 20 * time.Second, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe make this timeout configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe not in this PR... 😆
Thanks @oginskis @mcariatm for this contribution! I will start reviewing this PR this week. By the way, there seem to be some updates in dashboard_httpclient. It is highly possible that KubeRay will redesign RayJob in April. This means that we will likely remove the Dashboard HTTP client because submitting a job to RayCluster via the HTTP client is not an idempotent operation. It sometimes causes multiple job creations within a single RayJob. Please see #756 for more context. If this PR heavily relies on that part, it may not be merged in this release (v0.5.0 plans to be out at the end of March or early April). The KubeRay team will do our best to get this PR merge in v0.6.0. |
rayJobInstance.Status.DashboardURL = "" | ||
rayJobInstance.Status.JobId = "" | ||
rayJobInstance.Status.Message = "" | ||
err = r.updateState(ctx, rayJobInstance, jobInfo, rayv1alpha1.JobStatusStopped, rayv1alpha1.JobDeploymentStatusSuspended, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this in a separate API call? can it be one with line 246?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seconding this question
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this was addressed.
/cc |
/retitle [Feature] Support suspend in RayJob |
/cc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take a pass for this PR. Thanks for this contribution and the detailed PR description! As I mentioned above, it is highly possible that KubeRay will redesign RayJob in April (after v0.5.0) and remove the Dashboard HTTP client. We may either use head Pod command or a separate Kubernetes Job to submit the job to RayCluster. Again, the KubeRay team will do our best to get this PR merge in v0.6.0.
I'm not familiar with Kubernetes built-in Job and curious about the use case for suspend. Could you please explain it to me? I ask because when suspend is set to true while the RayJob is running, the RayCluster is deleted, and the job must be rerun entirely rather than resuming from the previous progress. What's the difference with submitting a new RayJob? Thanks!
The documentation for Job is here: https://kubernetes.io/docs/concepts/workloads/controllers/job/#suspending-a-job
The idea is that there is a higher level controller than can determine that the cluster is full and keep |
That said, if a particular application or framework implements checkpointing, the job could resume where it was suspended. |
/retitle [Feature] Support suspend in RayJob |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM from my side
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err | ||
} | ||
if !rayv1alpha1.IsJobTerminal(info.JobStatus) { | ||
err := rayDashboardClient.StopJob(ctx, rayJobInstance.Status.JobId, &r.Log) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like this was addressed
rayJobInstance.Status.DashboardURL = "" | ||
rayJobInstance.Status.JobId = "" | ||
rayJobInstance.Status.Message = "" | ||
err = r.updateState(ctx, rayJobInstance, jobInfo, rayv1alpha1.JobStatusStopped, rayv1alpha1.JobDeploymentStatusSuspended, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this was addressed.
76bad3b
to
8eb513d
Compare
@architkulkarni please rerun linter and the other checks. |
@@ -6,7 +6,7 @@ | |||
package v1alpha1 | |||
|
|||
import ( | |||
v1 "k8s.io/api/core/v1" | |||
"k8s.io/api/core/v1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"k8s.io/api/core/v1" | |
v1 "k8s.io/api/core/v1" |
Linter complains about this change for some reason... https://github.com/ray-project/kuberay/actions/runs/4876658441/jobs/8702043318?pr=926#step:11:19
I think it happens when running make test
or possibly other commands. I should try to get to the bottom of it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please rerun
d810ed9
to
3371dee
Compare
https://github.com/ray-project/kuberay/actions/runs/4879883408/jobs/8718473999?pr=926#step:5:62 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A basic question, what's the purpose of passing context to all the HTTP requests, is that a general best practice or does it fix something specific, separately from increasing the timeout?
Apart from that question, looks good to me pending the lint failure. Thanks for the contribution!
It's main purpose is to extend the timeout but keep it in the bounds of the
reconcile call. On slow machines (minikube on our dev laptops) we observed
that the job name collision was a result of not waiting for the initial
request to coplete on the server (dashboard API server), and doing the same
request again.
…On Thu, May 4, 2023, 20:38 Archit Kulkarni ***@***.***> wrote:
***@***.**** approved this pull request.
A basic question, what's the purpose of passing context to all the HTTP
requests, is that a general best practice or does it fix something
specific, separately from increasing the timeout?
Apart from that question, looks good to me pending the lint failure.
Thanks for the contribution!
—
Reply to this email directly, view it on GitHub
<#926 (review)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ANJHDCNXRYMGQWSLM5YWUJ3XEPSQNANCNFSM6AAAAAAVFXN6R4>
.
You are receiving this because you are subscribed to this thread.Message
ID: ***@***.***>
|
3371dee
to
ebd5054
Compare
can you try again please? |
@kevin85421 can you have a look? |
UpdateDeployments(specs rayv1alpha1.ServeDeploymentGraphSpec) error | ||
GetDeploymentsStatus() (*ServeDeploymentStatuses, error) | ||
GetDeployments(context.Context) (string, error) | ||
UpdateDeployments(ctx context.Context, specs rayv1alpha1.ServeDeploymentGraphSpec) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UpdateDeployments(ctx context.Context, specs rayv1alpha1.ServeDeploymentGraphSpec) error | |
UpdateDeployments(ctx context.Context, spec rayv1alpha1.ServeDeploymentGraphSpec) error |
Just one merge conflict here.
I've tested this manually and discussed offline with @kevin85421 and we are ready to merge this. @oginskis if you can address the merge conflict, I will merge the PR |
This reverts commit e19a4c0.
…eviously hard-coded 2 second timeout.
ebd5054
to
3c0496b
Compare
I rebased it. Please rerun the tests. |
3c0496b
to
fcaa614
Compare
Native Kubernetes Jobs have a suspend flag that allows to temporarily suspend a Job execution and resume it later, or start Jobs in a suspended state and have a custom controller, such as Kueue, decide later when to start them. So adding it to RayJob spec for consistency. Moreover, some frameworks like Kubeflow are adding it, so it becomes a standard functionality. An example implementation for MPIJob: kubeflow/mpi-operator#511 Implementation details If a RayJob is created with a spec.suspend == true, then RayCluster instance (with corresponding Kubernetes resources) is not created and the Ray job is not submitted to the cluster. The JobDeploymentStatus is set to Suspended and the corresponding event is issued. The RayJob remains in this state until somebody unsuspends the job. If suspend flips from true to false, then the RayJob controller immediately creates a RayCluster instance and submits the job. If suspend flips from false to true while Job is running, then the RayJob controller tries to gracefully stop the job and deletes the RayCluster instance (with underlying Kubernetes resources). The JobDeploymentStatus is set to Suspended; JobStatus is set to STOPPED and the corresponding event is issued. Edge case: suspend flag is ignored if a RayJob is submitted against an existing RayCluster instance (matched with ClusterSelector) since we can't delete a RayCluster created by somebody else. No Kueue-specific code leaked to Kuberay implementation Contributors from Kueue/Kubernetes cc'ed: @alculquicondor @mwielgus
Why are these changes needed?
Native Kubernetes Jobs have a
suspend
flag that allows to temporarily suspend a Job execution and resume it later, or start Jobs in a suspended state and have a custom controller, such as Kueue, decide later when to start them.So adding it to
RayJob
spec
for consistency. Moreover, some frameworks like Kubeflow are adding it, so it becomes a standard functionality. An example implementation for MPIJob: kubeflow/mpi-operator#511Implementation details
If a
RayJob
is created with aspec.suspend
==true
, thenRayCluster
instance (with corresponding Kubernetes resources) is not created and the Ray job is not submitted to the cluster. TheJobDeploymentStatus
is set toSuspended
and the corresponding event is issued. TheRayJob
remains in this state until somebody unsuspends the job.If
suspend
flips fromtrue
tofalse
, then the RayJob controller immediately creates aRayCluster
instance and submits the job.If
suspend
flips fromfalse
totrue
while Job is running, then the RayJob controller tries to gracefully stop the job and deletes theRayCluster
instance (with underlying Kubernetes resources). TheJobDeploymentStatus
is set toSuspended
;JobStatus
is set toSTOPPED
and the corresponding event is issued.Edge case:
suspend
flag is ignored if a RayJob is submitted against an existingRayCluster
instance (matched withClusterSelector
) since we can't delete aRayCluster
created by somebody else.No Kueue-specific code leaked to Kuberay implementation
Contributors from Kueue/Kubernetes cc'ed:
@alculquicondor
@mwielgus
Checks