Skip to content

Commit

Permalink
Improve RayJob controller quality to alpha (#398)
Browse files Browse the repository at this point in the history
1. Implement `spec.shutdownAfterJobFinishes`, delete the cluster once the job finishes if this field is set
2. Add status new field `JobDeploymentStatusComplete` to indicate the RayJob is complete
3. Add status field `Message`, `StartTime`, `EndTime` to expose more job status
4. Improve logs usage in the operators and make them look consistent
5. Optimize requeueAfter for some time consuming operators like waiting for dashboard ready (container takes time to start) etc.
Jeffwan authored Jul 26, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 09c7da3 commit 0c98cf2
Showing 9 changed files with 298 additions and 128 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/test-job.yaml
Original file line number Diff line number Diff line change
@@ -26,8 +26,10 @@ jobs:
# Default value should work for both pull_request and merge(push) event.
ref: ${{github.event.pull_request.head.sha}}

- name: Install goimports
run: go get golang.org/x/tools/cmd/goimports
- name: Install goimports and gofumpt
run: |
go get golang.org/x/tools/cmd/goimports
go install mvdan.cc/gofumpt@latest
- name: Run gofmt
uses: Jerome1337/[email protected]
@@ -89,9 +91,7 @@ jobs:
if: failure()

- name: Run gofumpt
run: |
go install mvdan.cc/gofumpt@latest
test -z "$(set -o pipefail && $(go env GOPATH)/bin/gofumpt -l apiserver/ ray-operator/ cli/ | tee gofumpt.out)" || { cat gofumpt.out && exit 1; }
run: test -z "$(set -o pipefail && $(go env GOPATH)/bin/gofumpt -l apiserver/ ray-operator/ cli/ | tee gofumpt.out)" || { cat gofumpt.out && exit 1; }

- name: Open this to see how to fix gofumpt if it fails
run: |
22 changes: 18 additions & 4 deletions ray-operator/apis/ray/v1alpha1/rayjob_types.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

// JobStatus is the Ray Job Status. https://docs.ray.io/en/latest/cluster/jobs-package-ref.html#jobstatus
type JobStatus string

const (
@@ -17,6 +18,7 @@ const (
JobStatusFailed JobStatus = "FAILED"
)

// JobDeploymentStatus indicates RayJob status including RayCluster lifecycle management and Job submission
type JobDeploymentStatus string

const (
@@ -26,6 +28,7 @@ const (
JobDeploymentStatusFailedJobDeploy JobDeploymentStatus = "FailedJobDeploy"
JobDeploymentStatusRunning JobDeploymentStatus = "Running"
JobDeploymentStatusFailedToGetJobStatus JobDeploymentStatus = "FailedToGetJobStatus"
JobDeploymentStatusComplete JobDeploymentStatus = "Complete"
)

// RayJobSpec defines the desired state of RayJob
@@ -37,10 +40,14 @@ type RayJobSpec struct {
Metadata map[string]string `json:"metadata,omitempty"`
// RuntimeEnv is base64 encoded.
RuntimeEnv string `json:"runtimeEnv,omitempty"`
// TODO: If set to true, the rayCluster will be deleted after the rayJob finishes
ShutdownAfterJobFinishes bool `json:"shutdownAfterJobFinishes,omitempty"`
// If jobId is not set, a new jobId will be auto-generated.
JobId string `json:"jobId,omitempty"`
JobId string `json:"jobId,omitempty"`
// ShutdownAfterJobFinishes will determine whether to delete the ray cluster once rayJob succeed or failed.
ShutdownAfterJobFinishes bool `json:"shutdownAfterJobFinishes,omitempty"`
// TTLSecondsAfterFinished is the TTL to clean up RayCluster.
// It's only working when ShutdownAfterJobFinishes set to true.
TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"`
// RayClusterSpec is the cluster template to run the job
RayClusterSpec RayClusterSpec `json:"rayClusterSpec,omitempty"`
// clusterSelector is used to select running rayclusters by labels
ClusterSelector map[string]string `json:"clusterSelector,omitempty"`
@@ -55,7 +62,14 @@ type RayJobStatus struct {
DashboardURL string `json:"dashboardURL,omitempty"`
JobStatus JobStatus `json:"jobStatus,omitempty"`
JobDeploymentStatus JobDeploymentStatus `json:"jobDeploymentStatus,omitempty"`
RayClusterStatus RayClusterStatus `json:"rayClusterStatus,omitempty"`
Message string `json:"message,omitempty"`
// Represents time when the job was acknowledged by the Ray cluster.
// It is not guaranteed to be set in happens-before order across separate operations.
// It is represented in RFC3339 form
StartTime *metav1.Time `json:"startTime,omitempty"`
// Represents time when the job was ended.
EndTime *metav1.Time `json:"endTime,omitempty"`
RayClusterStatus RayClusterStatus `json:"rayClusterStatus,omitempty"`
}

//+kubebuilder:object:root=true
13 changes: 13 additions & 0 deletions ray-operator/apis/ray/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 21 additions & 4 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml
Original file line number Diff line number Diff line change
@@ -53,8 +53,7 @@ spec:
description: Metadata is data to store along with this job.
type: object
rayClusterSpec:
description: 'EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
NOTE: json tags are required.'
description: RayClusterSpec is the cluster template to run the job
properties:
autoscalerOptions:
description: AutoscalerOptions specifies optional configuration
@@ -11493,9 +11492,13 @@ spec:
description: RuntimeEnv is base64 encoded.
type: string
shutdownAfterJobFinishes:
description: 'TODO: If set to true, the rayCluster will be deleted
after the rayJob finishes'
description: ShutdownAfterJobFinishes will determine whether to delete
the ray cluster once rayJob succeed or fai
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up RayCluster.
format: int32
type: integer
required:
- entrypoint
type: object
@@ -11504,13 +11507,22 @@ spec:
properties:
dashboardURL:
type: string
endTime:
description: Represents time when the job was ended.
format: date-time
type: string
jobDeploymentStatus:
description: JobDeploymentStatus indicates RayJob status including
RayCluster lifecycle management and Job submis
type: string
jobId:
description: 'INSERT ADDITIONAL STATUS FIELD - define observed state
of cluster Important: Run "make" to regenerat'
type: string
jobStatus:
description: JobStatus is the Ray Job Status. https://docs.ray.io/en/latest/cluster/jobs-package-ref.
type: string
message:
type: string
rayClusterName:
type: string
@@ -11553,6 +11565,11 @@ spec:
state of cluster Important: Run "make" to regenerat'
type: string
type: object
startTime:
description: Represents time when the job was acknowledged by the
Ray cluster.
format: date-time
type: string
type: object
type: object
served: true
134 changes: 66 additions & 68 deletions ray-operator/controllers/ray/raycluster_controller.go

Large diffs are not rendered by default.

200 changes: 159 additions & 41 deletions ray-operator/controllers/ray/rayjob_controller.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
@@ -625,7 +625,7 @@ func (r *RayServiceReconciler) reconcileIngress(ctx context.Context, rayServiceI
}
if createErr := r.Create(ctx, ingress); createErr != nil {
if errors.IsAlreadyExists(createErr) {
log.Info("Ingress already exists,no need to create")
r.Log.Info("Ingress already exists,no need to create")
return nil
}
r.Log.Error(createErr, "Ingress create error!", "Ingress.Error", createErr)
@@ -674,7 +674,7 @@ func (r *RayServiceReconciler) reconcileServices(ctx context.Context, rayService
}
if createErr := r.Create(ctx, raySvc); createErr != nil {
if errors.IsAlreadyExists(createErr) {
log.Info("raySvc already exists,no need to create")
r.Log.Info("raySvc already exists,no need to create")
return nil
}
r.Log.Error(createErr, "raySvc create error!", "raySvc.Error", createErr)
11 changes: 7 additions & 4 deletions ray-operator/controllers/ray/utils/dashboard_httpclient.go
Original file line number Diff line number Diff line change
@@ -134,7 +134,7 @@ func FetchDashboardURL(ctx context.Context, log *logr.Logger, cli client.Client,
return "", err
}

log.V(1).Info("fetchDashboardURL ", "dashboard service found", headSvc.Name)
log.V(3).Info("fetchDashboardURL ", "dashboard service found", headSvc.Name)
servicePorts := headSvc.Spec.Ports
dashboardPort := int32(-1)

@@ -319,7 +319,10 @@ func (r *RayDashboardClient) GetJobInfo(jobId string) (*RayJobInfo, error) {
return nil, nil
}

body, _ := ioutil.ReadAll(resp.Body)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}

var jobInfo RayJobInfo
if err = json.Unmarshal(body, &jobInfo); err != nil {
@@ -338,7 +341,7 @@ func (r *RayDashboardClient) SubmitJob(rayJob *rayv1alpha1.RayJob, log *logr.Log
if err != nil {
return
}
log.Info(fmt.Sprintf("Submit a ray job: %s", string(rayJobJson)))
log.Info("Submit a ray job", "rayJob", rayJob.Name, "jobInfo", string(rayJobJson))

req, err := http.NewRequest(http.MethodPost, r.dashboardURL+JobPath, bytes.NewBuffer(rayJobJson))
if err != nil {
@@ -378,7 +381,7 @@ func ConvertRayJobToReq(rayJob *rayv1alpha1.RayJob) (*RayJobRequest, error) {
var runtimeEnv map[string]interface{}
err = json.Unmarshal(decodeBytes, &runtimeEnv)
if err != nil {
return nil, fmt.Errorf("Failed to unmarshal runtimeEnv: %v: %v", decodeBytes, err)
return nil, fmt.Errorf("failed to unmarshal runtimeEnv: %v: %v", decodeBytes, err)
}
req.RuntimeEnv = runtimeEnv
return req, nil
7 changes: 7 additions & 0 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"sort"
"strconv"
"strings"
"time"
"unicode"

"k8s.io/apimachinery/pkg/util/json"
@@ -330,3 +331,9 @@ func CompareJsonStruct(objA interface{}, objB interface{}) bool {
}
return reflect.DeepEqual(v1, v2)
}

func ConvertUnixTimeToMetav1Time(unixTime int64) *metav1.Time {
t := time.Unix(unixTime, 0)
kt := metav1.NewTime(t)
return &kt
}

0 comments on commit 0c98cf2

Please sign in to comment.