Skip to content

Commit

Permalink
add-state-machine-and-exposing-port (#319)
Browse files Browse the repository at this point in the history
* add-state-machine-and-exposing-port

* remove code conflict

Co-authored-by: chenyu.jiang <[email protected]>
  • Loading branch information
scarlet25151 and chenyu.jiang authored Jul 2, 2022
1 parent 74cf314 commit d1e0d1a
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 109 deletions.
13 changes: 7 additions & 6 deletions apiserver/pkg/model/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ func FromCrdToApiClusters(clusters []*v1alpha1.RayCluster) []*api.Cluster {

func FromCrdToApiCluster(cluster *v1alpha1.RayCluster) *api.Cluster {
pbCluster := &api.Cluster{
Name: cluster.Name,
Namespace: cluster.Namespace,
Version: cluster.Labels[util.RayClusterVersionLabelKey],
User: cluster.Labels[util.RayClusterUserLabelKey],
Environment: api.Cluster_Environment(api.Cluster_Environment_value[cluster.Labels[util.RayClusterEnvironmentLabelKey]]),
CreatedAt: &timestamp.Timestamp{Seconds: cluster.CreationTimestamp.Unix()},
Name: cluster.Name,
Namespace: cluster.Namespace,
Version: cluster.Labels[util.RayClusterVersionLabelKey],
User: cluster.Labels[util.RayClusterUserLabelKey],
Environment: api.Cluster_Environment(api.Cluster_Environment_value[cluster.Labels[util.RayClusterEnvironmentLabelKey]]),
CreatedAt: &timestamp.Timestamp{Seconds: cluster.CreationTimestamp.Unix()},
ClusterState: string(cluster.Status.State),
}

// loop container and find the resource
Expand Down
3 changes: 3 additions & 0 deletions proto/cluster.proto
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ message Cluster {

// Output. The time that the cluster deleted.
google.protobuf.Timestamp deleted_at = 8;

// Output. The status to show the cluster status.state
string cluster_state = 9;
}

message ClusterSpec {
Expand Down
213 changes: 112 additions & 101 deletions proto/go_client/cluster.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions proto/swagger/cluster.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@
"type": "string",
"format": "date-time",
"description": "Output. The time that the cluster deleted."
},
"clusterState": {
"type": "string",
"title": "Output. The status to show the cluster status.state"
}
}
},
Expand Down
45 changes: 45 additions & 0 deletions ray-operator/controllers/ray/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"bytes"
"errors"
"fmt"
"strconv"
"strings"
Expand All @@ -23,6 +24,7 @@ const (
RayLogVolumeName = "ray-logs"
RayLogVolumeMountPath = "/tmp/ray"
AutoscalerContainerName = "autoscaler"
RayHeadContainer = "ray-head"
)

var log = logf.Log.WithName("RayCluster-Controller")
Expand Down Expand Up @@ -549,3 +551,46 @@ func findMemoryReqOrLimit(container v1.Container) (res *resource.Quantity) {
}
return nil
}

// ValidateHeadRayStartParams will do some validations for the head node RayStartParams,
// return include the bool to judge if the RayStartParams is valid and err will include some information for the warning or error output.
// if isValid is true, even if there maybe some error message, it is still acceptable for a ray and only affect the performance
// if isValid is false, it means the RayStartParams will definitely casue a unhealthy or failed status in ray cluster.
func ValidateHeadRayStartParams(rayHeadGroupSpec rayiov1alpha1.HeadGroupSpec) (isValid bool, err error) {
var objectStoreMemory int64
rayStartParams := rayHeadGroupSpec.RayStartParams
// validation for the object store memory
if objectStoreMemoryStr, ok := rayStartParams["object-store-memory"]; ok {
objectStoreMemory, err = strconv.ParseInt(objectStoreMemoryStr, 10, 64)
if err != nil {
isValid = false
err = errors.New("convert error of the \"object-store-memory\"")
return
}
for _, container := range rayHeadGroupSpec.Template.Spec.Containers {
// choose the ray container.
if container.Name == RayHeadContainer {
if shmSize, ok := container.Resources.Requests.Storage().AsInt64(); ok && objectStoreMemory > shmSize {
if envVarExists("RAY_OBJECT_STORE_ALLOW_SLOW_STORAGE", container.Env) {
// in ray if RAY_OBJECT_STORE_ALLOW_SLOW_STORAGE is set, it will only affect the performance.
isValid = true
log.Info(fmt.Sprintf("object store memory exceed the size of the share memory in head node, object-store-memory:%d, share memory size:%d\n", objectStoreMemory, shmSize) +
"This will harm performance. Consider deleting files in /dev/shm or increasing request memory of head node.")
err = errors.New("RayStartParams unhealthy")
return
} else {
// if not set, the head node may crash and result in an unhealthy status.
isValid = false
log.Info(fmt.Sprintf("object store memory exceed the size of the share memory in head node, object-store-memory:%d, share memory size:%d\n", objectStoreMemory, shmSize) +
"This will lead to a ValueError in ray! Consider deleting files in /dev/shm or increasing request memory of head node" +
"To ignore this warning, set an environment variable in headGroupSpec: RAY_OBJECT_STORE_ALLOW_SLOW_STORAGE=1")
err = errors.New("RayStartParams unhealthy")
return
}
}
}
}
}
// default return
return true, nil
}
39 changes: 37 additions & 2 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,39 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, request ctrl.Reque
return ctrl.Result{}, nil
}
if err := r.reconcileAutoscalerServiceAccount(instance); err != nil {
if updateErr := r.updateClusterState(instance, rayiov1alpha1.Failed); updateErr != nil {
log.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
}
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
if err := r.reconcileAutoscalerRole(instance); err != nil {
if updateErr := r.updateClusterState(instance, rayiov1alpha1.Failed); updateErr != nil {
log.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
}
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
if err := r.reconcileAutoscalerRoleBinding(instance); err != nil {
if updateErr := r.updateClusterState(instance, rayiov1alpha1.Failed); updateErr != nil {
log.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
}
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
if err := r.reconcileIngress(instance); err != nil {
if updateErr := r.updateClusterState(instance, rayiov1alpha1.Failed); updateErr != nil {
log.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
}
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
if err := r.reconcileServices(instance); err != nil {
if updateErr := r.updateClusterState(instance, rayiov1alpha1.Failed); updateErr != nil {
log.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
}
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
if err := r.reconcilePods(instance); err != nil {
if updateErr := r.updateClusterState(instance, rayiov1alpha1.Failed); updateErr != nil {
log.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
}
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
// update the status if needed
Expand Down Expand Up @@ -605,8 +623,20 @@ func (r *RayClusterReconciler) updateStatus(instance *rayiov1alpha1.RayCluster)
instance.Status.MaxWorkerReplicas = count
}

// TODO (@Jeffwan): Update state field later.
// We always update instance no matter if there's one change or not.
// validation for the RayStartParam for the state.
isValid, err := common.ValidateHeadRayStartParams(instance.Spec.HeadGroupSpec)
if err != nil {
r.Recorder.Event(instance, v1.EventTypeWarning, "Parameters conflict", err.Error())
}
// only in invalid status that we update the status to unhealthy.
if !isValid {
instance.Status.State = rayiov1alpha1.UnHealthy
} else {
if utils.CheckAllPodsRunnning(runtimePods) {
instance.Status.State = rayiov1alpha1.Ready
}
}

timeNow := metav1.Now()
instance.Status.LastUpdateTime = &timeNow
if err := r.Status().Update(context.Background(), instance); err != nil {
Expand Down Expand Up @@ -739,3 +769,8 @@ func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(instance *rayiov1a

return nil
}

func (r *RayClusterReconciler) updateClusterState(instance *rayiov1alpha1.RayCluster, clusterState rayiov1alpha1.ClusterState) error {
instance.Status.State = clusterState
return r.Status().Update(context.Background(), instance)
}
10 changes: 10 additions & 0 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,16 @@ func GetHeadGroupServiceAccountName(cluster *rayiov1alpha1.RayCluster) string {
return cluster.Name
}

// CheckAllPodsRunnning check if all pod in a list is running
func CheckAllPodsRunnning(runningPods corev1.PodList) bool {
for _, pod := range runningPods.Items {
if pod.Status.Phase != corev1.PodRunning {
return false
}
}
return true
}

func PodNotMatchingTemplate(pod corev1.Pod, template corev1.PodTemplateSpec) bool {
if pod.Status.Phase == corev1.PodRunning && pod.ObjectMeta.DeletionTimestamp == nil {
if len(template.Spec.Containers) != len(pod.Spec.Containers) {
Expand Down

0 comments on commit d1e0d1a

Please sign in to comment.