Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RayJob] Add runtime env YAML field #1338

Merged
merged 17 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/guidance/rayjob.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ $ kubectl get pod
* `rayClusterSpec` - The spec for the Ray cluster to run the job on.
* `jobId` - _(Optional)_ Job ID to specify for the job. If not provided, one will be generated.
* `metadata` - _(Optional)_ Arbitrary user-provided metadata for the job.
* `runtimeEnv` - _(Optional)_ base64-encoded string of the runtime env json string.
* `runtimeEnvYAML` - _(Optional)_ The runtime environment configuration provided as a multi-line YAML string.
* `shutdownAfterJobFinishes` - _(Optional)_ whether to recycle the cluster after the job finishes. Defaults to false.
* `ttlSecondsAfterFinished` - _(Optional)_ TTL to clean up the cluster. This only works if `shutdownAfterJobFinishes` is set.
* `submitterPodTemplate` - _(Optional)_ Pod template spec for the pod that runs `ray job submit` against the Ray cluster.
* `runtimeEnv` - [DEPRECATED] _(Optional)_ base64-encoded string of the runtime env json string.

## RayJob Observability

Expand Down
7 changes: 6 additions & 1 deletion helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12090,7 +12090,12 @@ spec:
- headGroupSpec
type: object
runtimeEnv:
description: RuntimeEnv is base64 encoded.
description: RuntimeEnv is base64 encoded. This field is deprecated,
please use RuntimeEnvYAML instead.
type: string
runtimeEnvYAML:
description: RuntimeEnvYAML represents the runtime environment configuration
provided as a multi-line YAML string
type: string
shutdownAfterJobFinishes:
description: ShutdownAfterJobFinishes will determine whether to delete
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/apis/ray/v1alpha1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ type RayJobSpec struct {
// Metadata is data to store along with this job.
Metadata map[string]string `json:"metadata,omitempty"`
// RuntimeEnv is base64 encoded.
// This field is deprecated, please use RuntimeEnvYAML instead.
RuntimeEnv string `json:"runtimeEnv,omitempty"`
// RuntimeEnvYAML represents the runtime environment configuration
// provided as a multi-line YAML string.
RuntimeEnvYAML string `json:"runtimeEnvYAML,omitempty"`
// If jobId is not set, a new jobId will be auto-generated.
JobId string `json:"jobId,omitempty"`
// ShutdownAfterJobFinishes will determine whether to delete the ray cluster once rayJob succeed or failed.
Expand Down
7 changes: 6 additions & 1 deletion ray-operator/config/crd/bases/ray.io_rayjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12090,7 +12090,12 @@ spec:
- headGroupSpec
type: object
runtimeEnv:
description: RuntimeEnv is base64 encoded.
description: RuntimeEnv is base64 encoded. This field is deprecated,
please use RuntimeEnvYAML instead.
type: string
runtimeEnvYAML:
description: RuntimeEnvYAML represents the runtime environment configuration
provided as a multi-line YAML string
type: string
shutdownAfterJobFinishes:
description: ShutdownAfterJobFinishes will determine whether to delete
Expand Down
10 changes: 9 additions & 1 deletion ray-operator/config/samples/ray-job.custom-head-svc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@ metadata:
name: rayjob-sample
spec:
entrypoint: python /home/ray/samples/sample_code.py
runtimeEnv: ewogICAgInBpcCI6IFsKICAgICAgICAicmVxdWVzdHM9PTIuMjYuMCIsCiAgICAgICAgInBlbmR1bHVtPT0yLjEuMiIKICAgIF0sCiAgICAiZW52X3ZhcnMiOiB7ImNvdW50ZXJfbmFtZSI6ICJ0ZXN0X2NvdW50ZXIifQp9Cg==

# RuntimeEnvYAML represents the runtime environment configuration provided as a multi-line YAML string.
RuntimeEnvYAML: |
pip:
- requests==2.26.0
- pendulum==2.1.2
env_vars:
counter_name: "test_counter"

# rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller.
rayClusterSpec:
rayVersion: '2.6.3' # should match the Ray version in the image of the containers
Expand Down
22 changes: 12 additions & 10 deletions ray-operator/config/samples/ray_v1alpha1_rayjob.shutdown.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,26 @@ metadata:
name: rayjob-sample-shutdown
spec:
entrypoint: python /home/ray/samples/sample_code.py

# shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false.
shutdownAfterJobFinishes: true

# ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes.
ttlSecondsAfterFinished: 10
# runtimeEnv decoded to '{
# "pip": [
# "requests==2.26.0",
# "pendulum==2.1.2"
# ],
# "env_vars": {
# "counter_name": "test_counter"
# }
# }'
runtimeEnv: ewogICAgInBpcCI6IFsKICAgICAgICAicmVxdWVzdHM9PTIuMjYuMCIsCiAgICAgICAgInBlbmR1bHVtPT0yLjEuMiIKICAgIF0sCiAgICAiZW52X3ZhcnMiOiB7ImNvdW50ZXJfbmFtZSI6ICJ0ZXN0X2NvdW50ZXIifQp9Cg==

# RuntimeEnvYAML represents the runtime environment configuration provided as a multi-line YAML string.
RuntimeEnvYAML: |
pip:
- requests==2.26.0
- pendulum==2.1.2
env_vars:
counter_name: "test_counter"

# Suspend specifies whether the RayJob controller should create a RayCluster instance.
# If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false.
# If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluste rwill be created.
# suspend: false

# rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller.
rayClusterSpec:
rayVersion: '2.5.0' # should match the Ray version in the image of the containers
Expand Down
21 changes: 11 additions & 10 deletions ray-operator/config/samples/ray_v1alpha1_rayjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@ spec:
entrypoint: python /home/ray/samples/sample_code.py
# shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false.
# shutdownAfterJobFinishes: false

# ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes.
# ttlSecondsAfterFinished: 10
# runtimeEnv decoded to '{
# "pip": [
# "requests==2.26.0",
# "pendulum==2.1.2"
# ],
# "env_vars": {
# "counter_name": "test_counter"
# }
#}'
runtimeEnv: ewogICAgInBpcCI6IFsKICAgICAgICAicmVxdWVzdHM9PTIuMjYuMCIsCiAgICAgICAgInBlbmR1bHVtPT0yLjEuMiIKICAgIF0sCiAgICAiZW52X3ZhcnMiOiB7ImNvdW50ZXJfbmFtZSI6ICJ0ZXN0X2NvdW50ZXIifQp9Cg==

# RuntimeEnvYAML represents the runtime environment configuration provided as a multi-line YAML string.
RuntimeEnvYAML: |
pip:
- requests==2.26.0
- pendulum==2.1.2
env_vars:
counter_name: "test_counter"

# Suspend specifies whether the RayJob controller should create a RayCluster instance.
# If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false.
# If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluste rwill be created.
# suspend: false

# rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller.
rayClusterSpec:
rayVersion: '2.5.0' # should match the Ray version in the image of the containers
Expand Down
41 changes: 34 additions & 7 deletions ray-operator/controllers/ray/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/google/shlex"
rayv1alpha1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/yaml"
)

// GetDecodedRuntimeEnv decodes the runtime environment for the Ray job from a base64-encoded string.
Expand All @@ -21,6 +22,33 @@ func GetDecodedRuntimeEnv(runtimeEnv string) (string, error) {
return string(decodedBytes), nil
}

// GetRuntimeEnvJson returns the JSON string of the runtime environment for the Ray job.
func getRuntimeEnvJson(rayJobInstance *rayv1alpha1.RayJob) (string, error) {
runtimeEnv := rayJobInstance.Spec.RuntimeEnv
runtimeEnvYAML := rayJobInstance.Spec.RuntimeEnvYAML

// Check if both runtimeEnv and RuntimeEnvYAML are specified.
if len(runtimeEnv) > 0 && len(runtimeEnvYAML) > 0 {
return "", fmt.Errorf("Both runtimeEnv and RuntimeEnvYAML are specified. Please specify only one of the fields.")
}

if len(runtimeEnv) > 0 {
return GetDecodedRuntimeEnv(runtimeEnv)
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
}

if len(runtimeEnvYAML) > 0 {
// Convert YAML to JSON
jsonData, err := yaml.YAMLToJSON([]byte(runtimeEnvYAML))
if err != nil {
return "", err
}
// We return the JSON as a string
return string(jsonData), nil
}

return "", nil
}

// GetBaseRayJobCommand returns the first part of the Ray Job command up to and including the address, e.g. "ray job submit --address http://..."
func GetBaseRayJobCommand(address string) []string {
// add http:// if needed
Expand Down Expand Up @@ -54,19 +82,18 @@ func GetMetadataJson(metadata map[string]string, rayVersion string) (string, err
// GetK8sJobCommand builds the K8s job command for the Ray job.
func GetK8sJobCommand(rayJobInstance *rayv1alpha1.RayJob) ([]string, error) {
address := rayJobInstance.Status.DashboardURL
runtimeEnv := rayJobInstance.Spec.RuntimeEnv
metadata := rayJobInstance.Spec.Metadata
jobId := rayJobInstance.Status.JobId
entrypoint := rayJobInstance.Spec.Entrypoint

k8sJobCommand := GetBaseRayJobCommand(address)

if len(runtimeEnv) > 0 {
runtimeEnvDecoded, err := GetDecodedRuntimeEnv(runtimeEnv)
if err != nil {
return nil, err
}
k8sJobCommand = append(k8sJobCommand, "--runtime-env-json", runtimeEnvDecoded)
runtimeEnvJson, err := getRuntimeEnvJson(rayJobInstance)
if err != nil {
return nil, err
}
if len(runtimeEnvJson) > 0 {
k8sJobCommand = append(k8sJobCommand, "--runtime-env-json", runtimeEnvJson)
}

if len(metadata) > 0 {
Expand Down
101 changes: 101 additions & 0 deletions ray-operator/controllers/ray/common/job_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"encoding/json"
"testing"

rayv1alpha1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1"
Expand Down Expand Up @@ -30,6 +31,49 @@ func TestGetDecodedRuntimeEnv(t *testing.T) {
assert.Equal(t, `{"test":"test"}`, decoded)
}

func TestGetRuntimeEnvJsonFromBase64(t *testing.T) {
expected := `{"test":"test"}`
jsonOutput, err := getRuntimeEnvJson(testRayJob)
assert.NoError(t, err)
assert.Equal(t, expected, jsonOutput)
}

func TestGetRuntimeEnvJsonFromYAML(t *testing.T) {
rayJobWithYAML := &rayv1alpha1.RayJob{
Spec: rayv1alpha1.RayJobSpec{
RuntimeEnvYAML: `
working_dir: "https://github.com/ray-project/serve_config_examples/archive/b393e77bbd6aba0881e3d94c05f968f05a387b96.zip"
pip: ["python-multipart==0.0.6"]
`,
},
}
expectedJSON := `{"working_dir":"https://github.com/ray-project/serve_config_examples/archive/b393e77bbd6aba0881e3d94c05f968f05a387b96.zip","pip":["python-multipart==0.0.6"]}`
jsonOutput, err := getRuntimeEnvJson(rayJobWithYAML)
assert.NoError(t, err)

var expectedMap map[string]interface{}
var actualMap map[string]interface{}

// Convert the JSON strings into map types to avoid errors due to ordering
assert.NoError(t, json.Unmarshal([]byte(expectedJSON), &expectedMap))
assert.NoError(t, json.Unmarshal([]byte(jsonOutput), &actualMap))

// Now compare the maps
assert.Equal(t, expectedMap, actualMap)
}

func TestGetRuntimeEnvJsonErrorWithBothFields(t *testing.T) {
rayJobWithBoth := &rayv1alpha1.RayJob{
Spec: rayv1alpha1.RayJobSpec{
RuntimeEnv: "eyJ0ZXN0IjoidGVzdCJ9",
RuntimeEnvYAML: `pip: ["python-multipart==0.0.6"]`,
},
}
_, err := getRuntimeEnvJson(rayJobWithBoth)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Both runtimeEnv and RuntimeEnvYAML are specified. Please specify only one of the fields.")
}

func TestGetBaseRayJobCommand(t *testing.T) {
expected := []string{"ray", "job", "submit", "--address", "http://127.0.0.1:8265"}
command := GetBaseRayJobCommand(testRayJob.Status.DashboardURL)
Expand Down Expand Up @@ -57,6 +101,63 @@ func TestGetK8sJobCommand(t *testing.T) {
assert.Equal(t, expected, command)
}

func TestGetK8sJobCommandWithYAML(t *testing.T) {
rayJobWithYAML := &rayv1alpha1.RayJob{
Spec: rayv1alpha1.RayJobSpec{
RuntimeEnvYAML: `
working_dir: "https://github.com/ray-project/serve_config_examples/archive/b393e77bbd6aba0881e3d94c05f968f05a387b96.zip"
pip: ["python-multipart==0.0.6"]
`,
Metadata: map[string]string{
"testKey": "testValue",
},
RayClusterSpec: &rayv1alpha1.RayClusterSpec{
RayVersion: "2.6.0",
},
Entrypoint: "echo hello",
},
Status: rayv1alpha1.RayJobStatus{
DashboardURL: "http://127.0.0.1:8265",
JobId: "testJobId",
},
}
expected := []string{
"ray", "job", "submit", "--address", "http://127.0.0.1:8265",
"--runtime-env-json", `{"working_dir":"https://github.com/ray-project/serve_config_examples/archive/b393e77bbd6aba0881e3d94c05f968f05a387b96.zip","pip":["python-multipart==0.0.6"]}`,
"--metadata-json", `{"testKey":"testValue"}`,
"--submission-id", "testJobId",
"--",
"echo", "hello",
}
command, err := GetK8sJobCommand(rayJobWithYAML)
assert.NoError(t, err)

// Ensure the slices are the same length.
assert.Equal(t, len(expected), len(command))

for i := 0; i < len(expected); i++ {
if expected[i] == "--runtime-env-json" {
// Decode the JSON string from the next element.
var expectedMap, actualMap map[string]interface{}
err1 := json.Unmarshal([]byte(expected[i+1]), &expectedMap)
err2 := json.Unmarshal([]byte(command[i+1]), &actualMap)

// If there's an error decoding either JSON string, it's an error in the test.
assert.NoError(t, err1)
assert.NoError(t, err2)

// Compare the maps directly to avoid errors due to ordering.
assert.Equal(t, expectedMap, actualMap)

// Skip the next element because we've just checked it.
i++
} else {
// For non-JSON elements, compare them directly.
assert.Equal(t, expected[i], command[i])
}
}
}

func TestMetadataRaisesErrorBeforeRay26(t *testing.T) {
rayJob := &rayv1alpha1.RayJob{
Spec: rayv1alpha1.RayJobSpec{
Expand Down
5 changes: 5 additions & 0 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,11 @@ func (r *RayJobReconciler) getSubmitterTemplate(rayJobInstance *rayv1alpha1.RayJ

// If the command in the submitter pod template isn't set, use the default command.
if len(submitterTemplate.Spec.Containers[0].Command) == 0 {
// Check for deprecated 'runtimeEnv' field usage and log a warning.
if len(rayJobInstance.Spec.RuntimeEnv) > 0 {
r.Log.Info("Warning: The 'runtimeEnv' field is deprecated. Please use 'runtimeEnvYAML' instead.")
}

k8sJobCommand, err := common.GetK8sJobCommand(rayJobInstance)
if err != nil {
return v1.PodTemplateSpec{}, err
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
k8s.io/code-generator v0.23.0
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b
sigs.k8s.io/controller-runtime v0.11.1
sigs.k8s.io/yaml v1.3.0
volcano.sh/apis v1.6.0-alpha.0.0.20221012070524-685db38b4fae
)

Expand Down Expand Up @@ -84,5 +85,4 @@ require (
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.0 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)