Skip to content

Commit

Permalink
KubeRay and kueue.sh integration
Browse files Browse the repository at this point in the history
  • Loading branch information
oginskis committed Mar 1, 2023
1 parent 5876b1d commit 1ab1fdd
Show file tree
Hide file tree
Showing 5 changed files with 412 additions and 27 deletions.
4 changes: 4 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11743,6 +11743,10 @@ spec:
description: ShutdownAfterJobFinishes will determine whether to delete
the ray cluster once rayJob succeed or fai
type: boolean
suspend:
description: suspend specifies whether the RayJob controller should
create a RayCluster instance
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up RayCluster.
format: int32
Expand Down
3 changes: 3 additions & 0 deletions ray-operator/apis/ray/v1alpha1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
JobDeploymentStatusRunning JobDeploymentStatus = "Running"
JobDeploymentStatusFailedToGetJobStatus JobDeploymentStatus = "FailedToGetJobStatus"
JobDeploymentStatusComplete JobDeploymentStatus = "Complete"
JobDeploymentStatusSuspended JobDeploymentStatus = "Suspended"
)

// RayJobSpec defines the desired state of RayJob
Expand All @@ -61,6 +62,8 @@ type RayJobSpec struct {
RayClusterSpec *RayClusterSpec `json:"rayClusterSpec,omitempty"`
// clusterSelector is used to select running rayclusters by labels
ClusterSelector map[string]string `json:"clusterSelector,omitempty"`
// suspend specifies whether the RayJob controller should create a RayCluster instance
Suspend bool `json:"suspend,omitempty"`
}

// RayJobStatus defines the observed state of RayJob
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11743,6 +11743,10 @@ spec:
description: ShutdownAfterJobFinishes will determine whether to delete
the ray cluster once rayJob succeed or fai
type: boolean
suspend:
description: suspend specifies whether the RayJob controller should
create a RayCluster instance
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up RayCluster.
format: int32
Expand Down
117 changes: 90 additions & 27 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
Expand Down Expand Up @@ -150,6 +151,20 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
err = r.updateState(ctx, rayJobInstance, nil, rayJobInstance.Status.JobStatus, rayv1alpha1.JobDeploymentStatusFailedToGetOrCreateRayCluster, err)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
// If there is no cluster instance and no error suspend the job deployment
if rayClusterInstance == nil {
// Already suspended?
if rayJobInstance.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusSuspended {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
err = r.updateState(ctx, rayJobInstance, nil, rayJobInstance.Status.JobStatus, rayv1alpha1.JobDeploymentStatusSuspended, err)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("rayJob suspended", "RayJob", rayJobInstance.Name)
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Suspended", "Suspended RayJob %s", rayJobInstance.Name)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

// Always update RayClusterStatus along with jobStatus and jobDeploymentStatus updates.
rayJobInstance.Status.RayClusterStatus = rayClusterInstance.Status
Expand Down Expand Up @@ -213,9 +228,44 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{}, err
}

// Job may takes long time to start and finish, let's just periodically requeue the job and check status.
if isJobPendingOrRunning(jobInfo.JobStatus) && rayJobInstance.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusRunning {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
if rayJobInstance.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusRunning {
// If suspend flag is set AND
// the RayJob is submitted against the RayCluster created by THIS job, then
// try to gracefully stop the Ray job and delete (suspend) the cluster
if isSuspendFlagSet(rayJobInstance) && len(rayJobInstance.Spec.ClusterSelector) == 0 {
info, err := rayDashboardClient.GetJobInfo(rayJobInstance.Status.JobId)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
if !rayv1alpha1.IsJobTerminal(info.JobStatus) {
err := rayDashboardClient.StopJob(rayJobInstance.Status.JobId, &r.Log)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
}
err = r.updateState(ctx, rayJobInstance, jobInfo, rayv1alpha1.JobStatusStopped, rayJobInstance.Status.JobDeploymentStatus, nil)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
_, err = r.deleteCluster(ctx, rayJobInstance)
if err != nil && !errors.IsNotFound(err) {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}
// Since RayCluster instance is gone, remove it status also
// on RayJob resource
rayJobInstance.Status.RayClusterStatus = rayv1alpha1.RayClusterStatus{}
err = r.updateState(ctx, rayJobInstance, jobInfo, rayv1alpha1.JobStatusStopped, rayv1alpha1.JobDeploymentStatusSuspended, nil)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("rayJob suspended", "RayJob", rayJobInstance.Name)
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Suspended", "Suspended RayJob %s", rayJobInstance.Name)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
// Job may takes long time to start and finish, let's just periodically requeue the job and check status.
}
if isJobPendingOrRunning(jobInfo.JobStatus) {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}
}

// Let's use rayJobInstance.Status.JobStatus to make sure we only delete cluster after the CR is updated.
Expand All @@ -231,34 +281,38 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
}
}

r.Log.Info("shutdownAfterJobFinishes set to true, we will delete cluster",
"RayJob", rayJobInstance.Name, "clusterName", fmt.Sprintf("%s/%s", rayJobInstance.Namespace, rayJobInstance.Status.RayClusterName))
clusterIdentifier := types.NamespacedName{
Name: rayJobInstance.Status.RayClusterName,
Namespace: rayJobInstance.Namespace,
}
cluster := rayv1alpha1.RayCluster{}
if err := r.Get(ctx, clusterIdentifier, &cluster); err != nil {
if !errors.IsNotFound(err) {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("The associated cluster has been already deleted and it can not be found", "RayCluster", clusterIdentifier)
} else {
if cluster.DeletionTimestamp != nil {
r.Log.Info("The cluster deletion is ongoing.", "rayjob", rayJobInstance.Name, "raycluster", cluster.Name)
} else {
if err := r.Delete(ctx, &cluster); err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("The associated cluster is deleted", "RayCluster", clusterIdentifier)
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Deleted", "Deleted cluster %s", rayJobInstance.Status.RayClusterName)
return ctrl.Result{Requeue: true}, nil
}
}
return r.deleteCluster(ctx, rayJobInstance)
}
}
return ctrl.Result{}, nil
}

func (r *RayJobReconciler) deleteCluster(ctx context.Context, rayJobInstance *rayv1alpha1.RayJob) (reconcile.Result, error) {
clusterIdentifier := types.NamespacedName{
Name: rayJobInstance.Status.RayClusterName,
Namespace: rayJobInstance.Namespace,
}
cluster := rayv1alpha1.RayCluster{}
if err := r.Get(ctx, clusterIdentifier, &cluster); err != nil {
if !errors.IsNotFound(err) {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("The associated cluster has been already deleted and it can not be found", "RayCluster", clusterIdentifier)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
} else {
if cluster.DeletionTimestamp != nil {
r.Log.Info("The cluster deletion is ongoing.", "rayjob", rayJobInstance.Name, "raycluster", cluster.Name)
} else {
if err := r.Delete(ctx, &cluster); err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("The associated cluster is deleted", "RayCluster", clusterIdentifier)
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Deleted", "Deleted cluster %s", rayJobInstance.Status.RayClusterName)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}
}
return ctrl.Result{}, nil
}

Expand All @@ -272,6 +326,11 @@ func isJobPendingOrRunning(status rayv1alpha1.JobStatus) bool {
return (status == rayv1alpha1.JobStatusPending) || (status == rayv1alpha1.JobStatusRunning)
}

// isSuspendFlagSet indicates whether the job has a suspended flag set.
func isSuspendFlagSet(job *rayv1alpha1.RayJob) bool {
return job.Spec.Suspend
}

// SetupWithManager sets up the controller with the Manager.
func (r *RayJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down Expand Up @@ -388,11 +447,15 @@ func (r *RayJobReconciler) getOrCreateRayClusterInstance(ctx context.Context, ra
return nil, err
}

// one special case is the job is complete status and cluster has been recycled.
// special case: is the job is complete status and cluster has been recycled.
if isJobSucceedOrFailed(rayJobInstance.Status.JobStatus) && rayJobInstance.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusComplete {
r.Log.Info("The cluster has been recycled for the job, skip duplicate creation", "rayjob", rayJobInstance.Name)
return nil, err
}
// special case: don't create a cluster instance and don't return an error if the suspend flag of the job is true
if isSuspendFlagSet(rayJobInstance) {
return nil, nil
}

r.Log.Info("RayCluster not found, creating rayCluster!", "raycluster", rayClusterNamespacedName)
rayClusterInstance, err = r.constructRayClusterForRayJob(rayJobInstance, rayClusterInstanceName)
Expand Down
Loading

0 comments on commit 1ab1fdd

Please sign in to comment.