Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replacement for https://github.com/ray-project/kuberay/pull/1312 #1409

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apiserver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/prometheus/client_golang v1.11.1
github.com/ray-project/kuberay/proto v0.0.0-20220703232803-3e7749d17400
github.com/ray-project/kuberay/ray-operator v0.0.0-20220703232803-3e7749d17400
github.com/stretchr/testify v1.8.4
google.golang.org/grpc v1.40.0
google.golang.org/protobuf v1.27.1
k8s.io/api v0.23.0
Expand Down Expand Up @@ -47,6 +48,7 @@ require (
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.28.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
Expand Down
3 changes: 2 additions & 1 deletion apiserver/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
Expand Down
21 changes: 17 additions & 4 deletions apiserver/pkg/manager/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ func (r *ResourceManager) CreateCluster(ctx context.Context, apiCluster *api.Clu
}

// convert *api.Cluster to v1alpha1.RayCluster
rayCluster := util.NewRayCluster(apiCluster, computeTemplateDict)
rayCluster, err := util.NewRayCluster(apiCluster, computeTemplateDict)
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Failed to create a Ray cluster")
}

// set our own fields.
clusterAt := r.clientManager.Time().Now().String()
Expand All @@ -105,6 +108,7 @@ func (r *ResourceManager) CreateCluster(ctx context.Context, apiCluster *api.Clu
return newRayCluster, nil
}

// Compute template
func (r *ResourceManager) populateComputeTemplate(ctx context.Context, clusterSpec *api.ClusterSpec, nameSpace string) (map[string]*api.ComputeTemplate, error) {
dict := map[string]*api.ComputeTemplate{}
// populate head compute template
Expand Down Expand Up @@ -216,7 +220,10 @@ func (r *ResourceManager) CreateJob(ctx context.Context, apiJob *api.RayJob) (*v
}

// convert *api.Cluster to v1alpha1.RayCluster
rayJob := util.NewRayJob(apiJob, computeTemplateMap)
rayJob, err := util.NewRayJob(apiJob, computeTemplateMap)
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Failed to create a Ray Job")
}

newRayJob, err := r.getRayJobClient(apiJob.Namespace).Create(ctx, rayJob.Get(), metav1.CreateOptions{})
if err != nil {
Expand Down Expand Up @@ -303,7 +310,10 @@ func (r *ResourceManager) CreateService(ctx context.Context, apiService *api.Ray
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to populate compute template for (%s/%s)", apiService.Namespace, apiService.Name)
}
rayService := util.NewRayService(apiService, computeTemplateDict)
rayService, err := util.NewRayService(apiService, computeTemplateDict)
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Failed to create a Ray Service")
}
createdAt := r.clientManager.Time().Now().String()
rayService.Annotations["ray.io/creation-timestamp"] = createdAt
newRayService, err := r.getRayServiceClient(apiService.Namespace).Create(ctx, rayService.Get(), metav1.CreateOptions{})
Expand All @@ -327,7 +337,10 @@ func (r *ResourceManager) UpdateRayService(ctx context.Context, apiService *api.
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to populate compute template for (%s/%s)", apiService.Namespace, apiService.Name)
}
rayService := util.NewRayService(apiService, computeTemplateDict)
rayService, err := util.NewRayService(apiService, computeTemplateDict)
if err != nil {
return nil, err
}
rayService.Annotations["ray.io/update-timestamp"] = r.clientManager.Time().Now().String()
rayService.ResourceVersion = oldService.DeepCopy().ResourceVersion
newRayService, err := client.Update(ctx, rayService.Get(), metav1.UpdateOptions{})
Expand Down
10 changes: 10 additions & 0 deletions apiserver/pkg/model/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func FromCrdToApiCluster(cluster *v1alpha1.RayCluster, events []v1.Event) *api.C
ClusterState: string(cluster.Status.State),
}

if len(cluster.ObjectMeta.Annotations) > 0 {
pbCluster.Annotations = cluster.ObjectMeta.Annotations
}

// loop container and find the resource
pbCluster.ClusterSpec = PopulateRayClusterSpec(cluster.Spec)

Expand Down Expand Up @@ -146,6 +150,7 @@ func PopulateHeadNodeSpec(spec v1alpha1.HeadGroupSpec) *api.HeadGroupSpec {
ServiceType: string(spec.ServiceType),
Image: spec.Template.Annotations[util.RayClusterImageAnnotationKey],
ComputeTemplate: spec.Template.Annotations[util.RayClusterComputeTemplateAnnotationKey],
Volumes: PopulateVolumes(&spec.Template),
}

for _, annotation := range getNodeDefaultAnnotations() {
Expand All @@ -162,6 +167,10 @@ func PopulateHeadNodeSpec(spec v1alpha1.HeadGroupSpec) *api.HeadGroupSpec {
headNodeSpec.Labels = spec.Template.Labels
}

if spec.EnableIngress != nil && *spec.EnableIngress {
headNodeSpec.EnableIngress = true
}

// Here we update environment only for a container named 'ray-head'
if container, _, ok := util.GetContainerByName(spec.Template.Spec.Containers, "ray-head"); ok && len(container.Env) > 0 {
env := make(map[string]string)
Expand Down Expand Up @@ -196,6 +205,7 @@ func PopulateWorkerNodeSpec(specs []v1alpha1.WorkerGroupSpec) []*api.WorkerGroup
GroupName: spec.GroupName,
Image: spec.Template.Annotations[util.RayClusterImageAnnotationKey],
ComputeTemplate: spec.Template.Annotations[util.RayClusterComputeTemplateAnnotationKey],
Volumes: PopulateVolumes(&spec.Template),
}

for _, annotation := range getNodeDefaultAnnotations() {
Expand Down
32 changes: 29 additions & 3 deletions apiserver/pkg/model/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

var (
enableIngress = false
enableIngress = true
headNodeReplicas int32 = 1
workerReplicas int32 = 5
)
Expand Down Expand Up @@ -62,7 +62,7 @@ var headSpecTest = v1alpha1.HeadGroupSpec{
Containers: []v1.Container{
{
Name: "ray-head",
Image: "blublinsky1/ray310:2.6.3",
Image: "blublinsky1/ray310:2.5.0",
Env: []v1.EnvVar{
{
Name: "AWS_KEY",
Expand Down Expand Up @@ -162,7 +162,7 @@ var workerSpecTest = v1alpha1.WorkerGroupSpec{
Containers: []v1.Container{
{
Name: "ray-worker",
Image: "blublinsky1/ray310:2.6.3",
Image: "blublinsky1/ray310:2.5.0",
Env: []v1.EnvVar{
{
Name: "AWS_KEY",
Expand Down Expand Up @@ -195,6 +195,22 @@ var workerSpecTest = v1alpha1.WorkerGroupSpec{
},
}

var ClusterSpecTest = v1alpha1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "raycluster-sample",
Namespace: "default",
Annotations: map[string]string{
"kubernetes.io/ingress.class": "nginx",
},
},
Spec: v1alpha1.RayClusterSpec{
HeadGroupSpec: headSpecTest,
WorkerGroupSpecs: []v1alpha1.WorkerGroupSpec{
workerSpecTest,
},
},
}

var expectedAnnotations = map[string]string{
"custom": "value",
}
Expand All @@ -220,6 +236,9 @@ func TestPopulateHeadNodeSpec(t *testing.T) {
if groupSpec.ServiceAccount != "account" {
t.Errorf("failed to convert service account")
}
if groupSpec.EnableIngress != *headSpecTest.EnableIngress {
t.Errorf("failed to convert enableIngress")
}
if groupSpec.ImagePullSecret != "foo" {
t.Errorf("failed to convert image pull secret")
}
Expand Down Expand Up @@ -254,6 +273,13 @@ func TestPopulateWorkerNodeSpec(t *testing.T) {
}
}

func TestPopulateRayClusterSpec(t *testing.T) {
cluster := FromCrdToApiCluster(&ClusterSpecTest, []v1.Event{})
if len(cluster.Annotations) != 1 {
t.Errorf("failed to convert cluster's annotations")
}
}

func TestPopulateTemplate(t *testing.T) {
template := FromKubeToAPIComputeTemplate(&configMapWithoutTolerations)
if len(template.Tolerations) != 0 {
Expand Down
103 changes: 103 additions & 0 deletions apiserver/pkg/model/volumes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package model

import (
api "github.com/ray-project/kuberay/proto/go_client"
v1 "k8s.io/api/core/v1"
)

func PopulateVolumes(podTemplate *v1.PodTemplateSpec) []*api.Volume {
if len(podTemplate.Spec.Volumes) == 0 {
return nil
}
var volumes []*api.Volume
for _, vol := range podTemplate.Spec.Volumes {
mount := GetVolumeMount(podTemplate, vol.Name)
if vol.VolumeSource.HostPath != nil {
// Host Path
volumes = append(volumes, &api.Volume{
Name: vol.Name,
MountPath: mount.MountPath,
Source: vol.VolumeSource.HostPath.Path,
MountPropagationMode: GetVolumeMountPropagation(mount),
VolumeType: api.Volume_VolumeType(api.Volume_HOSTTOCONTAINER),
HostPathType: GetVolumeHostPathType(&vol),
})
continue

}
if vol.VolumeSource.PersistentVolumeClaim != nil {
// PVC
volumes = append(volumes, &api.Volume{
Name: vol.Name,
MountPath: mount.MountPath,
MountPropagationMode: GetVolumeMountPropagation(mount),
VolumeType: api.Volume_PERSISTENT_VOLUME_CLAIM,
ReadOnly: vol.VolumeSource.PersistentVolumeClaim.ReadOnly,
})
continue
}
if vol.VolumeSource.Ephemeral != nil {
// Ephimeral
request := vol.VolumeSource.Ephemeral.VolumeClaimTemplate.Spec.Resources.Requests[v1.ResourceStorage]
volume := api.Volume{
Name: vol.Name,
MountPath: mount.MountPath,
MountPropagationMode: GetVolumeMountPropagation(mount),
VolumeType: api.Volume_EPHEMERAL,
AccessMode: GetAccessMode(&vol),
Storage: request.String(),
}
if vol.VolumeSource.Ephemeral.VolumeClaimTemplate.Spec.StorageClassName != nil {
volume.StorageClassName = *vol.VolumeSource.Ephemeral.VolumeClaimTemplate.Spec.StorageClassName
}
volumes = append(volumes, &volume)
continue
}
}
return volumes
}

func GetVolumeMount(podTemplate *v1.PodTemplateSpec, vol string) *v1.VolumeMount {
for _, container := range podTemplate.Spec.Containers {
for _, mount := range container.VolumeMounts {
if mount.Name == vol {
return &mount
}
}
}
return nil
}

func GetVolumeMountPropagation(mount *v1.VolumeMount) api.Volume_MountPropagationMode {
if mount.MountPropagation == nil {
return api.Volume_NONE
}
if *mount.MountPropagation == v1.MountPropagationHostToContainer {
return api.Volume_HOSTTOCONTAINER
}
if *mount.MountPropagation == v1.MountPropagationBidirectional {
return api.Volume_BIDIRECTIONAL
}
return api.Volume_NONE
}

func GetVolumeHostPathType(vol *v1.Volume) api.Volume_HostPathType {
if *vol.VolumeSource.HostPath.Type == v1.HostPathFile {
return api.Volume_FILE
}
return api.Volume_DIRECTORY
}

func GetAccessMode(vol *v1.Volume) api.Volume_AccessMode {
modes := vol.VolumeSource.Ephemeral.VolumeClaimTemplate.Spec.AccessModes
if len(modes) == 0 {
return api.Volume_RWO
}
if modes[0] == v1.ReadOnlyMany {
return api.Volume_ROX
}
if modes[0] == v1.ReadWriteMany {
return api.Volume_RWX
}
return api.Volume_RWO
}
Loading