Skip to content

Commit

Permalink
[Bug][RayJob] RayJob with custom head service name (ray-project#1332)
Browse files Browse the repository at this point in the history
RayJob with custom head service name
  • Loading branch information
kevin85421 authored Aug 16, 2023
1 parent e11701d commit 23fd92b
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 30 deletions.
115 changes: 115 additions & 0 deletions ray-operator/config/samples/ray-job.custom-head-svc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# This YAML file is primarily for testing purposes. It specifies a custom head service name for the
# RayCluster associated with the RayJob. The head service allows worker Pods to connect to the head
# Pod and enables the Kubernetes Job, owned by the RayJob, to submit tasks to the RayCluster.
apiVersion: ray.io/v1alpha1
kind: RayJob
metadata:
name: rayjob-sample
spec:
entrypoint: python /home/ray/samples/sample_code.py
runtimeEnv: ewogICAgInBpcCI6IFsKICAgICAgICAicmVxdWVzdHM9PTIuMjYuMCIsCiAgICAgICAgInBlbmR1bHVtPT0yLjEuMiIKICAgIF0sCiAgICAiZW52X3ZhcnMiOiB7ImNvdW50ZXJfbmFtZSI6ICJ0ZXN0X2NvdW50ZXIifQp9Cg==
# rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller.
rayClusterSpec:
rayVersion: '2.6.3' # should match the Ray version in the image of the containers
# Ray head pod template
headGroupSpec:
headService:
metadata:
name: custom-ray-head-service-name
# The `rayStartParams` are used to configure the `ray start` command.
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
# See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
rayStartParams:
dashboard-host: '0.0.0.0'
#pod template
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.6.3
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265 # Ray dashboard
name: dashboard
- containerPort: 10001
name: client
resources:
limits:
cpu: "1"
requests:
cpu: "1"
volumeMounts:
- mountPath: /home/ray/samples
name: code-sample
volumes:
- name: code-sample
configMap:
name: ray-job-code-sample
items:
- key: sample_code.py
path: sample_code.py
workerGroupSpecs:
# the pod replicas in this group typed worker
- replicas: 1
minReplicas: 1
maxReplicas: 5
# logical group name, for this called small-group, also can be functional
groupName: small-group
# The `rayStartParams` are used to configure the `ray start` command.
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
# See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
rayStartParams: {}
#pod template
template:
spec:
containers:
- name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc'
image: rayproject/ray:2.6.3
lifecycle:
preStop:
exec:
command: [ "/bin/sh","-c","ray stop" ]
resources:
limits:
cpu: "1"
requests:
cpu: "1"
######################Ray code sample#################################
# this sample is from https://docs.ray.io/en/latest/cluster/job-submission.html#quick-start-example
# it is mounted into the container and executed to show the Ray job at work
---
apiVersion: v1
kind: ConfigMap
metadata:
name: ray-job-code-sample
data:
sample_code.py: |
import ray
import os
import requests
ray.init()
@ray.remote
class Counter:
def __init__(self):
# Used to verify runtimeEnv
self.name = os.getenv("counter_name")
assert self.name == "test_counter"
self.counter = 0
def inc(self):
self.counter += 1
def get_counter(self):
return "{} got {}".format(self.name, self.counter)
counter = Counter.remote()
for _ in range(5):
ray.get(counter.inc.remote())
print(ray.get(counter.get_counter.remote()))
# Verify that the correct runtime env was used for the job.
assert requests.__version__ == "2.26.0"
14 changes: 12 additions & 2 deletions ray-operator/controllers/ray/common/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,18 @@ func BuildIngressForHeadService(cluster rayv1alpha1.RayCluster) (*networkingv1.I
if port, ok := servicePorts["dashboard"]; ok {
dashboardPort = port
}

headSvcName, err := utils.GenerateHeadServiceName(utils.RayClusterCRD, cluster.Spec, cluster.Name)
if err != nil {
return nil, err
}
paths = []networkingv1.HTTPIngressPath{
{
Path: "/" + cluster.Name + "/(.*)",
PathType: &pathType,
Backend: networkingv1.IngressBackend{
Service: &networkingv1.IngressServiceBackend{
Name: utils.GenerateServiceName(cluster.Name),
Name: headSvcName,
Port: networkingv1.ServiceBackendPort{
Number: dashboardPort,
},
Expand Down Expand Up @@ -100,7 +105,12 @@ func BuildIngressForRayService(service rayv1alpha1.RayService, cluster rayv1alph
return nil, err
}

ingress.ObjectMeta.Name = utils.GenerateServiceName(service.Name)
headSvcName, err := utils.GenerateHeadServiceName(utils.RayServiceCRD, service.Spec.RayClusterSpec, service.Name)
if err != nil {
return nil, err
}

ingress.ObjectMeta.Name = headSvcName
ingress.ObjectMeta.Namespace = service.Namespace
ingress.ObjectMeta.Labels = map[string]string{
RayServiceLabelKey: service.Name,
Expand Down
4 changes: 3 additions & 1 deletion ray-operator/controllers/ray/common/ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,11 @@ func TestBuildIngressForHeadService(t *testing.T) {

// path names
paths := ingress.Spec.Rules[0].IngressRuleValue.HTTP.Paths
headSvcName, err := utils.GenerateHeadServiceName(utils.RayClusterCRD, instanceWithIngressEnabled.Spec, instanceWithIngressEnabled.Name)
assert.Nil(t, err)
for _, path := range paths {
actualResult = path.Backend.Service.Name
expectedResult = utils.GenerateServiceName(instanceWithIngressEnabled.Name)
expectedResult = headSvcName

if !reflect.DeepEqual(expectedResult, actualResult) {
t.Fatalf("Expected `%v` but got `%v`", expectedResult, actualResult)
Expand Down
12 changes: 6 additions & 6 deletions ray-operator/controllers/ray/common/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func TestBuildPod(t *testing.T) {
// testing worker pod
worker := cluster.Spec.WorkerGroupSpecs[0]
podName = cluster.Name + DashSymbol + string(rayv1alpha1.WorkerNode) + DashSymbol + worker.GroupName + DashSymbol + utils.FormatInt32(0)
fqdnRayIP := utils.GenerateFQDNServiceName(cluster.Name, cluster.Namespace)
fqdnRayIP := utils.GenerateFQDNServiceName(*cluster, cluster.Namespace)
podTemplateSpec = DefaultWorkerPodTemplate(*cluster, worker, podName, fqdnRayIP, "6379")
pod = BuildPod(podTemplateSpec, rayv1alpha1.WorkerNode, worker.RayStartParams, "6379", nil, "", fqdnRayIP)

Expand Down Expand Up @@ -549,7 +549,7 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) {
// Build a worker pod
worker := cluster.Spec.WorkerGroupSpecs[0]
podName = cluster.Name + DashSymbol + string(rayv1alpha1.WorkerNode) + DashSymbol + worker.GroupName + DashSymbol + utils.FormatInt32(0)
fqdnRayIP := utils.GenerateFQDNServiceName(cluster.Name, cluster.Namespace)
fqdnRayIP := utils.GenerateFQDNServiceName(*cluster, cluster.Namespace)
podTemplateSpec = DefaultWorkerPodTemplate(*cluster, worker, podName, fqdnRayIP, "6379")
pod = BuildPod(podTemplateSpec, rayv1alpha1.WorkerNode, worker.RayStartParams, "6379", nil, "", fqdnRayIP)

Expand Down Expand Up @@ -799,7 +799,7 @@ func TestCleanupInvalidVolumeMounts(t *testing.T) {

func TestDefaultWorkerPodTemplateWithName(t *testing.T) {
cluster := instance.DeepCopy()
fqdnRayIP := utils.GenerateFQDNServiceName(cluster.Name, cluster.Namespace)
fqdnRayIP := utils.GenerateFQDNServiceName(*cluster, cluster.Namespace)
worker := cluster.Spec.WorkerGroupSpecs[0]
worker.Template.ObjectMeta.Name = "ray-worker-test"
podName := cluster.Name + DashSymbol + string(rayv1alpha1.WorkerNode) + DashSymbol + worker.GroupName + DashSymbol + utils.FormatInt32(0)
Expand Down Expand Up @@ -851,7 +851,7 @@ func TestDefaultWorkerPodTemplateWithConfigurablePorts(t *testing.T) {
cluster.Spec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Ports = []v1.ContainerPort{}
worker := cluster.Spec.WorkerGroupSpecs[0]
podName := cluster.Name + DashSymbol + string(rayv1alpha1.WorkerNode) + DashSymbol + worker.GroupName + DashSymbol + utils.FormatInt32(0)
fqdnRayIP := utils.GenerateFQDNServiceName(cluster.Name, cluster.Namespace)
fqdnRayIP := utils.GenerateFQDNServiceName(*cluster, cluster.Namespace)
podTemplateSpec := DefaultWorkerPodTemplate(*cluster, worker, podName, fqdnRayIP, "6379")
// DefaultWorkerPodTemplate will add the default metrics port if user doesn't specify it.
// Verify the default metrics port exists.
Expand All @@ -874,7 +874,7 @@ func TestDefaultWorkerPodTemplateWithConfigurablePorts(t *testing.T) {
func TestDefaultInitContainer(t *testing.T) {
// A default init container to check the health of GCS is expected to be added.
cluster := instance.DeepCopy()
fqdnRayIP := utils.GenerateFQDNServiceName(cluster.Name, cluster.Namespace)
fqdnRayIP := utils.GenerateFQDNServiceName(*cluster, cluster.Namespace)
worker := cluster.Spec.WorkerGroupSpecs[0]
podName := cluster.Name + DashSymbol + string(rayv1alpha1.WorkerNode) + DashSymbol + worker.GroupName + DashSymbol + utils.FormatInt32(0)
expectedResult := len(cluster.Spec.WorkerGroupSpecs[0].Template.Spec.InitContainers) + 1
Expand Down Expand Up @@ -908,7 +908,7 @@ func TestDefaultInitContainer(t *testing.T) {

func TestDefaultInitContainerImagePullPolicy(t *testing.T) {
cluster := instance.DeepCopy()
fqdnRayIP := utils.GenerateFQDNServiceName(cluster.Name, cluster.Namespace)
fqdnRayIP := utils.GenerateFQDNServiceName(*cluster, cluster.Namespace)
worker := cluster.Spec.WorkerGroupSpecs[0]
podName := cluster.Name + DashSymbol + string(rayv1alpha1.WorkerNode) + DashSymbol + worker.GroupName + DashSymbol + utils.FormatInt32(0)

Expand Down
12 changes: 10 additions & 2 deletions ray-operator/controllers/ray/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ func BuildServiceForHeadPod(cluster rayv1alpha1.RayCluster, labels map[string]st
annotations = make(map[string]string)
}

default_name := utils.GenerateServiceName(cluster.Name)
default_name, err := utils.GenerateHeadServiceName(utils.RayClusterCRD, cluster.Spec, cluster.Name)
if err != nil {
return nil, err
}
default_namespace := cluster.Namespace
default_type := cluster.Spec.HeadGroupSpec.ServiceType

Expand Down Expand Up @@ -132,7 +135,12 @@ func BuildHeadServiceForRayService(rayService rayv1alpha1.RayService, rayCluster
return nil, err
}

service.ObjectMeta.Name = utils.GenerateServiceName(rayService.Name)
headSvcName, err := utils.GenerateHeadServiceName(utils.RayServiceCRD, rayService.Spec.RayClusterSpec, rayService.Name)
if err != nil {
return nil, err
}

service.ObjectMeta.Name = headSvcName
service.ObjectMeta.Namespace = rayService.Namespace
service.ObjectMeta.Labels = map[string]string{
RayServiceLabelKey: rayService.Name,
Expand Down
9 changes: 5 additions & 4 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,8 +754,8 @@ func (r *RayClusterReconciler) createWorkerPod(ctx context.Context, instance ray
// Build head instance pod(s).
func (r *RayClusterReconciler) buildHeadPod(instance rayv1alpha1.RayCluster) corev1.Pod {
podName := strings.ToLower(instance.Name + common.DashSymbol + string(rayv1alpha1.HeadNode) + common.DashSymbol)
podName = utils.CheckName(podName) // making sure the name is valid
fqdnRayIP := utils.GenerateFQDNServiceName(instance.Name, instance.Namespace) // Fully Qualified Domain Name
podName = utils.CheckName(podName) // making sure the name is valid
fqdnRayIP := utils.GenerateFQDNServiceName(instance, instance.Namespace) // Fully Qualified Domain Name
// The Ray head port used by workers to connect to the cluster (GCS server port for Ray >= 1.11.0, Redis port for older Ray.)
headPort := common.GetHeadPort(instance.Spec.HeadGroupSpec.RayStartParams)
autoscalingEnabled := instance.Spec.EnableInTreeAutoscaling
Expand Down Expand Up @@ -787,8 +787,9 @@ func getCreator(instance rayv1alpha1.RayCluster) string {
// Build worker instance pods.
func (r *RayClusterReconciler) buildWorkerPod(instance rayv1alpha1.RayCluster, worker rayv1alpha1.WorkerGroupSpec) corev1.Pod {
podName := strings.ToLower(instance.Name + common.DashSymbol + string(rayv1alpha1.WorkerNode) + common.DashSymbol + worker.GroupName + common.DashSymbol)
podName = utils.CheckName(podName) // making sure the name is valid
fqdnRayIP := utils.GenerateFQDNServiceName(instance.Name, instance.Namespace) // Fully Qualified Domain Name
podName = utils.CheckName(podName) // making sure the name is valid
fqdnRayIP := utils.GenerateFQDNServiceName(instance, instance.Namespace) // Fully Qualified Domain Name

// The Ray head port used by workers to connect to the cluster (GCS server port for Ray >= 1.11.0, Redis port for older Ray.)
headPort := common.GetHeadPort(instance.Spec.HeadGroupSpec.RayStartParams)
autoscalingEnabled := instance.Spec.EnableInTreeAutoscaling
Expand Down
4 changes: 3 additions & 1 deletion ray-operator/controllers/ray/rayservice_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,10 @@ applications:

It("should create a new head service resource", func() {
svc := &corev1.Service{}
headSvcName, err := utils.GenerateHeadServiceName(utils.RayServiceCRD, myRayService.Spec.RayClusterSpec, myRayService.Name)
Expect(err).To(BeNil(), "failed to generate head service name")
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: utils.GenerateServiceName(myRayService.Name), Namespace: "default"}, svc),
getResourceFunc(ctx, client.ObjectKey{Name: headSvcName, Namespace: "default"}, svc),
time.Second*15, time.Millisecond*500).Should(BeNil(), "My head service = %v", svc)
Expect(svc.Spec.Selector[common.RayIDLabelKey]).Should(Equal(utils.GenerateIdentifier(myRayCluster.Name, rayv1alpha1.HeadNode)))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,12 @@ func TestFetchHeadServiceURL(t *testing.T) {
Namespace: namespace,
},
}

headSvcName, err := utils.GenerateHeadServiceName(utils.RayClusterCRD, cluster.Spec, cluster.Name)
assert.Nil(t, err, "Fail to generate head service name")
headSvc := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: utils.GenerateServiceName(cluster.Name),
Name: headSvcName,
Namespace: cluster.ObjectMeta.Namespace,
},
Spec: corev1.ServiceSpec{
Expand Down
9 changes: 7 additions & 2 deletions ray-operator/controllers/ray/utils/dashboard_httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,13 @@ type RayDashboardClient struct {
// and the port with the given port name (defaultPortName).
func FetchHeadServiceURL(ctx context.Context, log *logr.Logger, cli client.Client, rayCluster *rayv1alpha1.RayCluster, defaultPortName string) (string, error) {
headSvc := &corev1.Service{}
headSvcName := GenerateServiceName(rayCluster.Name)
if err := cli.Get(ctx, client.ObjectKey{Name: headSvcName, Namespace: rayCluster.Namespace}, headSvc); err != nil {
headSvcName, err := GenerateHeadServiceName(RayClusterCRD, rayCluster.Spec, rayCluster.Name)
if err != nil {
log.Error(err, "Failed to generate head service name", "RayCluster name", rayCluster.Name, "RayCluster spec", rayCluster.Spec)
return "", err
}

if err = cli.Get(ctx, client.ObjectKey{Name: headSvcName, Namespace: rayCluster.Namespace}, headSvc); err != nil {
if errors.IsNotFound(err) {
log.Error(err, "Head service is not found", "head service name", headSvcName, "namespace", rayCluster.Namespace)
}
Expand Down
50 changes: 40 additions & 10 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ const (
DefaultDomainName = "cluster.local"
)

// TODO (kevin85421): Define CRDType here rather than constant.go to avoid circular dependency.
type CRDType string

const (
RayClusterCRD CRDType = "RayCluster"
RayJobCRD CRDType = "RayJob"
RayServiceCRD CRDType = "RayService"
)

// GetClusterDomainName returns cluster's domain name
func GetClusterDomainName() string {
if domain := os.Getenv(ClusterDomainEnvKey); len(domain) > 0 {
Expand Down Expand Up @@ -129,14 +138,40 @@ func GetNamespace(metaData metav1.ObjectMeta) string {
return metaData.Namespace
}

// GenerateServiceName generates a Ray head service name from cluster name
func GenerateServiceName(clusterName string) string {
return CheckName(fmt.Sprintf("%s-%s-%s", clusterName, rayv1alpha1.HeadNode, "svc"))
// GenerateHeadServiceName generates a Ray head service name. Note that there are two types of head services:
//
// (1) For RayCluster: If `HeadService.Name` in the cluster spec is not empty, it will be used as the head service name.
// Otherwise, the name is generated based on the RayCluster CR's name.
// (2) For RayService: It's important to note that the RayService CR not only possesses a head service owned by its RayCluster CR
// but also maintains a separate head service for itself to facilitate zero-downtime upgrades. The name of the head service owned
// by the RayService CR is generated based on the RayService CR's name.
//
// @param crdType: The type of the CRD that owns the head service.
// @param clusterSpec: `RayClusterSpec`
// @param ownerName: The name of the CR that owns the head service.
func GenerateHeadServiceName(crdType CRDType, clusterSpec rayv1alpha1.RayClusterSpec, ownerName string) (string, error) {
switch crdType {
case RayServiceCRD:
return CheckName(fmt.Sprintf("%s-%s-%s", ownerName, rayv1alpha1.HeadNode, "svc")), nil
case RayClusterCRD:
headSvcName := CheckName(fmt.Sprintf("%s-%s-%s", ownerName, rayv1alpha1.HeadNode, "svc"))
if clusterSpec.HeadGroupSpec.HeadService != nil && clusterSpec.HeadGroupSpec.HeadService.Name != "" {
headSvcName = clusterSpec.HeadGroupSpec.HeadService.Name
}
return headSvcName, nil
default:
return "", fmt.Errorf("unknown CRD type: %s", crdType)
}
}

// GenerateFQDNServiceName generates a Fully Qualified Domain Name.
func GenerateFQDNServiceName(clusterName string, namespace string) string {
return fmt.Sprintf("%s.%s.svc.%s", GenerateServiceName(clusterName), namespace, GetClusterDomainName())
func GenerateFQDNServiceName(cluster rayv1alpha1.RayCluster, namespace string) string {
headSvcName, err := GenerateHeadServiceName(RayClusterCRD, cluster.Spec, cluster.Name)
if err != nil {
logrus.Errorf("Failed to generate head service name: %v", err)
return ""
}
return fmt.Sprintf("%s.%s.svc.%s", headSvcName, namespace, GetClusterDomainName())
}

// ExtractRayIPFromFQDN extracts the head service name (i.e., RAY_IP, deprecated) from a fully qualified
Expand All @@ -145,11 +180,6 @@ func ExtractRayIPFromFQDN(fqdnRayIP string) string {
return strings.Split(fqdnRayIP, ".")[0]
}

// GenerateDashboardServiceName generates a ray head service name from cluster name
func GenerateDashboardServiceName(clusterName string) string {
return fmt.Sprintf("%s-%s-%s", clusterName, DashboardName, "svc")
}

// GenerateServeServiceName generates name for serve service.
func GenerateServeServiceName(serviceName string) string {
return CheckName(fmt.Sprintf("%s-%s-%s", serviceName, ServeName, "svc"))
Expand Down
Loading

0 comments on commit 23fd92b

Please sign in to comment.