Skip to content

Commit

Permalink
support update rayservice (#633)
Browse files Browse the repository at this point in the history
Support updating Ray Services using the KubeRay ApiServer.

Co-authored-by: chenyu.jiang <[email protected]>
  • Loading branch information
scarlet25151 and chenyu.jiang authored Nov 5, 2022
1 parent 4820d94 commit 599e74b
Show file tree
Hide file tree
Showing 10 changed files with 1,622 additions and 325 deletions.
54 changes: 54 additions & 0 deletions apiserver/pkg/manager/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type ResourceManagerInterface interface {
ListAllJobs(ctx context.Context) ([]*v1alpha1.RayJob, error)
DeleteJob(ctx context.Context, jobName string, namespace string) error
CreateService(ctx context.Context, apiService *api.RayService) (*v1alpha1.RayService, error)
UpdateRayService(ctx context.Context, request *api.UpdateRayServiceRequest) (*v1alpha1.RayService, error)
UpdateRayServiceConfigs(ctx context.Context, request *api.UpdateRayServiceConfigsRequest) (*v1alpha1.RayService, error)
GetService(ctx context.Context, serviceName, namespace string) error
ListServices(ctx context.Context, namespace string) ([]*v1alpha1.RayService, error)
ListAllServices(ctx context.Context) ([]*v1alpha1.RayService, error)
Expand Down Expand Up @@ -312,6 +314,58 @@ func (r *ResourceManager) CreateService(ctx context.Context, apiService *api.Ray
return newRayService, nil
}

func (r *ResourceManager) UpdateRayService(ctx context.Context, apiService *api.RayService) (*v1alpha1.RayService, error) {
name := apiService.Name
namespace := apiService.Namespace
client := r.getRayServiceClient(namespace)
oldService, err := getServiceByName(ctx, client, name)
if err != nil {
return nil, util.Wrap(err, fmt.Sprintf("Update service fail, no service named: %s ", name))
}
// populate cluster map
computeTemplateDict, err := r.populateComputeTemplate(ctx, apiService.ClusterSpec, apiService.Namespace)
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to populate compute template for (%s/%s)", apiService.Namespace, apiService.Name)
}
rayService := util.NewRayService(apiService, computeTemplateDict)
rayService.Annotations["ray.io/update-timestamp"] = r.clientManager.Time().Now().String()
rayService.ResourceVersion = oldService.DeepCopy().ResourceVersion
newRayService, err := client.Update(ctx, rayService.Get(), metav1.UpdateOptions{})
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to update service for (%s/%s)", rayService.Namespace, rayService.Name)
}
return newRayService, nil
}

func (r *ResourceManager) UpdateRayServiceConfigs(ctx context.Context, request *api.UpdateRayServiceConfigsRequest) (*v1alpha1.RayService, error) {
serviceName := request.Name
namespace := request.Namespace

client := r.getRayServiceClient(namespace)
service, err := getServiceByName(ctx, client, serviceName)
if err != nil {
return nil, util.Wrap(err, fmt.Sprintf("Update service fail, no service named: %s ", serviceName))
}
updateService := request.GetUpdateService()
// if workerGroupSpec is not nil, update worker group
if updateService.WorkerGroupUpdateSpec != nil {
oldWorkerGroups := service.Spec.RayClusterSpec.WorkerGroupSpecs
newWorkerGroups := util.UpdateRayServiceWorkerGroupSpecs(updateService.WorkerGroupUpdateSpec, oldWorkerGroups)
service.Spec.RayClusterSpec.WorkerGroupSpecs = newWorkerGroups
}
if updateService.ServeDeploymentGraphSpec != nil {
oldServeDeploymentGraphSpec := service.Spec.ServeDeploymentGraphSpec
newServeDeploymentGraphSpec := util.UpdateServeDeploymentGraphSpec(updateService.ServeDeploymentGraphSpec, oldServeDeploymentGraphSpec)
service.Spec.ServeDeploymentGraphSpec = newServeDeploymentGraphSpec
}
service.Annotations["ray.io/update-timestamp"] = r.clientManager.Time().Now().String()
newService, err := client.Update(ctx, service, metav1.UpdateOptions{})
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to update service for (%s/%s)", service.Namespace, service.Name)
}
return newService, nil
}

func (r *ResourceManager) GetService(ctx context.Context, serviceName, namespace string) (*v1alpha1.RayService, error) {
client := r.getRayServiceClient(namespace)
return getServiceByName(ctx, client, serviceName)
Expand Down
4 changes: 3 additions & 1 deletion apiserver/pkg/model/converter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package model

import (
"encoding/base64"
"fmt"
"strconv"

Expand Down Expand Up @@ -200,9 +201,10 @@ func FromCrdToApiService(service *v1alpha1.RayService, events []v1.Event) *api.R
}

func PopulateServeDeploymentGraphSpec(spec v1alpha1.ServeDeploymentGraphSpec) *api.ServeDeploymentGraphSpec {
runtimeEnv, _ := base64.StdEncoding.DecodeString(spec.RuntimeEnv)
return &api.ServeDeploymentGraphSpec{
ImportPath: spec.ImportPath,
RuntimeEnv: spec.RuntimeEnv,
RuntimeEnv: string(runtimeEnv),
ServeConfigs: PopulateServeConfig(spec.ServeConfigSpecs),
}
}
Expand Down
114 changes: 114 additions & 0 deletions apiserver/pkg/server/serve_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,38 @@ func (s *RayServiceServer) CreateRayService(ctx context.Context, request *api.Cr
return model.FromCrdToApiService(rayService, events), nil
}

func (s *RayServiceServer) UpdateRayService(ctx context.Context, request *api.UpdateRayServiceRequest) (*api.RayService, error) {
if err := ValidateUpdateServiceRequest(request); err != nil {
return nil, util.Wrap(err, "Validate update service request failed.")
}
request.Service.Namespace = request.Namespace

rayService, err := s.resourceManager.UpdateRayService(ctx, request.Service)
if err != nil {
return nil, util.Wrap(err, "Update ray service failed.")
}
events, err := s.resourceManager.GetServiceEvents(ctx, *rayService)
if err != nil {
klog.Warningf("failed to get rayService's event, service: %s/%s, err: %v", rayService.Namespace, rayService.Name, err)
}
return model.FromCrdToApiService(rayService, events), nil
}

func (s *RayServiceServer) UpdateRayServiceConfigs(ctx context.Context, request *api.UpdateRayServiceConfigsRequest) (*api.RayService, error) {
if err := ValidateUpdateRayServiceConfigsRequest(request); err != nil {
return nil, err
}
service, err := s.resourceManager.UpdateRayServiceConfigs(ctx, request)
if err != nil {
return nil, err
}
events, err := s.resourceManager.GetServiceEvents(ctx, *service)
if err != nil {
klog.Warningf("failed to get rayService's event, service: %s/%s, err: %v", service.Namespace, service.Name, err)
}
return model.FromCrdToApiService(service, events), nil
}

func (s *RayServiceServer) GetRayService(ctx context.Context, request *api.GetRayServiceRequest) (*api.RayService, error) {
if request.Name == "" {
return nil, util.NewInvalidInputError("ray service name is empty. Please specify a valid value.")
Expand Down Expand Up @@ -164,3 +196,85 @@ func ValidateCreateServiceRequest(request *api.CreateRayServiceRequest) error {

return nil
}

func ValidateUpdateServiceRequest(request *api.UpdateRayServiceRequest) error {
if request.Name == "" {
return util.NewInvalidInputError("Service name is empty. Please specify a valid value.")
}
if request.Namespace == "" {
return util.NewInvalidInputError("Namespace is empty. Please specify a valid value.")
}

if request.Service == nil {
return util.NewInvalidInputError("Service is empty, please input a valid payload.")
}

if request.Namespace != request.Service.Namespace {
return util.NewInvalidInputError("The namespace in the request is different from the namespace in the service definition.")
}

if request.Service.Name == "" {
return util.NewInvalidInputError("Service name is empty. Please specify a valid value.")
}

if request.Service.User == "" {
return util.NewInvalidInputError("User who create the Service is empty. Please specify a valid value.")
}

if len(request.Service.ClusterSpec.HeadGroupSpec.ComputeTemplate) == 0 {
return util.NewInvalidInputError("HeadGroupSpec compute template is empty. Please specify a valid value.")
}

for index, spec := range request.Service.ClusterSpec.WorkerGroupSpec {
if len(spec.GroupName) == 0 {
return util.NewInvalidInputError("WorkerNodeSpec %d group name is empty. Please specify a valid value.", index)
}
if len(spec.ComputeTemplate) == 0 {
return util.NewInvalidInputError("WorkerNodeSpec %d compute template is empty. Please specify a valid value.", index)
}
if spec.MaxReplicas == 0 {
return util.NewInvalidInputError("WorkerNodeSpec %d MaxReplicas can not be 0. Please specify a valid value.", index)
}
if spec.MinReplicas > spec.MaxReplicas {
return util.NewInvalidInputError("WorkerNodeSpec %d MinReplica > MaxReplicas. Please specify a valid value.", index)
}
}

return nil
}

func ValidateUpdateRayServiceConfigsRequest(request *api.UpdateRayServiceConfigsRequest) error {
if request.Name == "" {
return util.NewInvalidInputError("ray service name is empty. Please specify a valid value.")
}
if request.Namespace == "" {
return util.NewInvalidInputError("ray service namespace is empty. Please specify a valid value.")
}
updateServiceBody := request.GetUpdateService()
if updateServiceBody == nil || (updateServiceBody.WorkerGroupUpdateSpec == nil && updateServiceBody.ServeDeploymentGraphSpec == nil) {
return util.NewInvalidInputError("update spec is empty. Nothing to update.")
}
if updateServiceBody.WorkerGroupUpdateSpec != nil {
for _, spec := range updateServiceBody.WorkerGroupUpdateSpec {
if spec.Replicas <= 0 || spec.MinReplicas <= 0 || spec.MaxReplicas <= 0 {
return util.NewInvalidInputError("input invalid, replicas, minReplicas and maxReplicas must be greater than 0.")
}
if spec.MinReplicas > spec.MaxReplicas {
return util.NewInvalidInputError("WorkerNodeSpec %s MinReplica > MaxReplicas. Please specify a valid value.", spec.GroupName)
}
}
}
if updateServiceBody.ServeDeploymentGraphSpec != nil {
for _, spec := range updateServiceBody.ServeDeploymentGraphSpec.ServeConfigs {
if spec.Replicas <= 0 {
return util.NewInvalidInputError("input invalid, replicas must be greater than 0.")
}
if spec.ActorOptions != nil {
if spec.ActorOptions.CpusPerActor <= 0 && spec.ActorOptions.GpusPerActor <= 0 && spec.ActorOptions.MemoryPerActor <= 0 {
return util.NewInvalidInputError("input invalid, cpusPerActor, gpusPerActor and memoryPerActor must be greater than 0.")
}
}
}
}
return nil
}
68 changes: 68 additions & 0 deletions apiserver/pkg/util/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,71 @@ func buildRayServiceSpec(apiService *api.RayService, computeTemplateMap map[stri
RayClusterSpec: *buildRayClusterSpec(rayServiceDefaultVersion, nil, apiService.ClusterSpec, computeTemplateMap),
}
}

func UpdateRayServiceWorkerGroupSpecs(updateSpecs []*api.WorkerGroupUpdateSpec, workerGroupSpecs []rayalphaapi.WorkerGroupSpec) []rayalphaapi.WorkerGroupSpec {
specMap := map[string]*api.WorkerGroupUpdateSpec{}
for _, spec := range updateSpecs {
if spec != nil {
specMap[spec.GroupName] = spec
}
}
for i, spec := range workerGroupSpecs {
if updateSpec, ok := specMap[spec.GroupName]; ok {
newSpec := updateWorkerGroupSpec(updateSpec, spec)
workerGroupSpecs[i] = newSpec
}
}
return workerGroupSpecs
}

func updateWorkerGroupSpec(updateSpec *api.WorkerGroupUpdateSpec, workerGroupSpec rayalphaapi.WorkerGroupSpec) rayalphaapi.WorkerGroupSpec {
replicas := updateSpec.Replicas
minReplicas := updateSpec.MinReplicas
maxReplicas := updateSpec.MaxReplicas

workerGroupSpec.Replicas = &replicas
workerGroupSpec.MinReplicas = &minReplicas
workerGroupSpec.MaxReplicas = &maxReplicas
return workerGroupSpec
}

func UpdateServeDeploymentGraphSpec(updateSpecs *api.ServeDeploymentGraphSpec, serveDeploymentGraphspec rayalphaapi.ServeDeploymentGraphSpec) rayalphaapi.ServeDeploymentGraphSpec {
if updateSpecs.ImportPath != "" {
serveDeploymentGraphspec.ImportPath = updateSpecs.ImportPath
}
if updateSpecs.RuntimeEnv != "" {
serveDeploymentGraphspec.RuntimeEnv = base64.StdEncoding.EncodeToString([]byte(updateSpecs.RuntimeEnv))
}

if updateSpecs.ServeConfigs != nil {
specMap := map[string]*api.ServeConfig{}
for _, spec := range updateSpecs.ServeConfigs {
if spec != nil {
specMap[spec.DeploymentName] = spec
}
}
for i, spec := range serveDeploymentGraphspec.ServeConfigSpecs {
if updateSpec, ok := specMap[spec.Name]; ok {
newSpec := updateServeConfigSpec(updateSpec, spec)
serveDeploymentGraphspec.ServeConfigSpecs[i] = newSpec
}
}
}
return serveDeploymentGraphspec
}

func updateServeConfigSpec(updateSpec *api.ServeConfig, serveConfigSpec rayalphaapi.ServeConfigSpec) rayalphaapi.ServeConfigSpec {
if updateSpec.Replicas != 0 {
serveConfigSpec.NumReplicas = &updateSpec.Replicas
}
if updateSpec.ActorOptions.CpusPerActor != 0 {
serveConfigSpec.RayActorOptions.NumCpus = &updateSpec.ActorOptions.CpusPerActor
}
if updateSpec.ActorOptions.GpusPerActor != 0 {
serveConfigSpec.RayActorOptions.NumGpus = &updateSpec.ActorOptions.GpusPerActor
}
if updateSpec.ActorOptions.MemoryPerActor != 0 {
serveConfigSpec.RayActorOptions.Memory = &updateSpec.ActorOptions.MemoryPerActor
}
return serveConfigSpec
}
Loading

0 comments on commit 599e74b

Please sign in to comment.