Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][autoscaler] Autoscaler doesn't scale up correctly when the KubeRay RayCluster is not in the goal state #48909

Merged
merged 6 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/autoscaler/kuberay/ray-cluster.complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ spec:
workerGroupSpecs:
# the pod replicas in this group typed worker
- replicas: 1
minReplicas: 1
minReplicas: 0
maxReplicas: 300
# logical group name, for this called small-group, also can be functional
groupName: small-group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,21 +209,25 @@ def _initialize_scale_request(
cur_instances = self.instances

# Get the worker groups that have pending deletes and the worker groups that
# have finished deletes.
# have finished deletes, and the set of workers included in the workersToDelete
# field of any worker group.
(
worker_groups_with_pending_deletes,
worker_groups_without_pending_deletes,
) = self._get_workers_groups_with_deletes(
ray_cluster, set(cur_instances.keys())
)
worker_to_delete_set,
) = self._get_workers_delete_info(ray_cluster, set(cur_instances.keys()))

# Calculate the desired number of workers by type.
num_workers_dict = defaultdict(int)
for _, cur_instance in cur_instances.items():
if cur_instance.node_kind == NodeKind.HEAD:
# Only track workers.
continue
num_workers_dict[cur_instance.node_type] += 1
worker_groups = ray_cluster["spec"].get("workerGroupSpecs", [])
for worker_group in worker_groups:
node_type = worker_group["groupName"]
# Handle the case where users manually increase `minReplicas`
# to scale up the number of worker Pods. In this scenario,
# `replicas` will be smaller than `minReplicas`.
num_workers_dict[node_type] = max(
worker_group["replicas"], worker_group["minReplicas"]
)

# Add to launch nodes.
for node_type, count in to_launch.items():
Expand All @@ -242,6 +246,11 @@ def _initialize_scale_request(
# Not possible to delete head node.
continue

if to_delete_instance.cloud_instance_id in worker_to_delete_set:
# If the instance is already in the workersToDelete field of
# any worker group, skip it.
continue

num_workers_dict[to_delete_instance.node_type] -= 1
assert num_workers_dict[to_delete_instance.node_type] >= 0
to_delete_instances_by_type[to_delete_instance.node_type].append(
Expand Down Expand Up @@ -321,6 +330,7 @@ def _submit_scale_request(
# No patch required.
return

logger.info(f"Submitting a scale request: {scale_request}")
self._patch(f"rayclusters/{self._cluster_name}", patch_payload)

def _add_launch_errors(
Expand Down Expand Up @@ -392,9 +402,9 @@ def instances(self) -> Dict[CloudInstanceId, CloudInstance]:
return copy.deepcopy(self._cached_instances)

@staticmethod
def _get_workers_groups_with_deletes(
def _get_workers_delete_info(
ray_cluster_spec: Dict[str, Any], node_set: Set[CloudInstanceId]
) -> Tuple[Set[NodeType], Set[NodeType]]:
) -> Tuple[Set[NodeType], Set[NodeType], Set[CloudInstanceId]]:
"""
Gets the worker groups that have pending deletes and the worker groups that
have finished deletes.
Expand All @@ -404,10 +414,13 @@ def _get_workers_groups_with_deletes(
deletes.
worker_groups_with_finished_deletes: The worker groups that have finished
deletes.
worker_to_delete_set: A set of Pods that are included in the workersToDelete
field of any worker group.
"""

worker_groups_with_pending_deletes = set()
worker_groups_with_deletes = set()
worker_to_delete_set = set()

worker_groups = ray_cluster_spec["spec"].get("workerGroupSpecs", [])
for worker_group in worker_groups:
Expand All @@ -422,14 +435,19 @@ def _get_workers_groups_with_deletes(
worker_groups_with_deletes.add(node_type)

for worker in workersToDelete:
worker_to_delete_set.add(worker)
if worker in node_set:
worker_groups_with_pending_deletes.add(node_type)
break

worker_groups_with_finished_deletes = (
worker_groups_with_deletes - worker_groups_with_pending_deletes
)
return worker_groups_with_pending_deletes, worker_groups_with_finished_deletes
return (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: function signature as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename the function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed the function: 24ba62a

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, sorry, i meant the return types. it's currently saying it's returning a tuple of 2, while it's actually 3.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! updated 2c9c3d0

worker_groups_with_pending_deletes,
worker_groups_with_finished_deletes,
worker_to_delete_set,
)

def _fetch_instances(self) -> Dict[CloudInstanceId, CloudInstance]:
"""
Expand Down
118 changes: 118 additions & 0 deletions python/ray/autoscaler/v2/tests/test_node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,124 @@ def test_pending_deletes(self):
},
]

def test_increase_min_replicas_to_scale_up(self):
# Simulate the case where users manually increase the `minReplicas` field
# from 0 to $num_pods. KubeRay will create $num_pods worker Pods to meet the new
# `minReplicas`, even though the `replicas` field is still 0.
small_group = "small-group"
num_pods = 0
assert (
self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0]["groupName"]
== small_group
)
for pod in self.mock_client._pod_list["items"]:
if pod["metadata"]["labels"]["ray.io/group"] == small_group:
num_pods += 1
assert num_pods > 0
self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0]["replicas"] = 0
self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0][
"minReplicas"
] = num_pods

# Launching a new node and `replicas` should be
# `max(replicas, minReplicas) + 1`.
self.provider.launch(shape={small_group: 1}, request_id="launch-1")
patches = self.mock_client.get_patches(
f"rayclusters/{self.provider._cluster_name}"
)
assert len(patches) == 1
assert patches[0] == {
"op": "replace",
"path": "/spec/workerGroupSpecs/0/replicas",
"value": num_pods + 1,
}

def test_inconsistent_pods_raycr_scale_up(self):
"""
Test the case where the cluster state has not yet reached the desired state.
Specifically, the replicas field in the RayCluster CR does not match the actual
number of Pods.
"""
# Check the assumptions of the test
small_group = "small-group"
num_pods = 0
for pod in self.mock_client._pod_list["items"]:
if pod["metadata"]["labels"]["ray.io/group"] == small_group:
num_pods += 1

assert (
self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0]["groupName"]
== small_group
)
desired_replicas = num_pods + 1
self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0][
"replicas"
] = desired_replicas

# Launch a new node. The replicas field should be incremented by 1, even though
# the cluster state has not yet reached the goal state.
launch_request = {"small-group": 1}
self.provider.launch(shape=launch_request, request_id="launch-1")

patches = self.mock_client.get_patches(
f"rayclusters/{self.provider._cluster_name}"
)
assert len(patches) == 1
assert patches[0] == {
"op": "replace",
"path": "/spec/workerGroupSpecs/0/replicas",
"value": desired_replicas + 1,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is it possible to also add tests that to delete workers are handled correctly when calculating the goal state by the change for regression?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added 24ba62a


def test_inconsistent_pods_raycr_scale_down(self):
"""
Test the case where the cluster state has not yet reached the desired state.
Specifically, the replicas field in the RayCluster CR does not match the actual
number of Pods.
"""
# Check the assumptions of the test
small_group = "small-group"
num_pods = 0
pod_to_delete = None
for pod in self.mock_client._pod_list["items"]:
if pod["metadata"]["labels"]["ray.io/group"] == small_group:
num_pods += 1
pod_to_delete = pod["metadata"]["name"]
assert pod_to_delete is not None

assert (
self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0]["groupName"]
== small_group
)
desired_replicas = num_pods + 1
self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0][
"replicas"
] = desired_replicas

# Terminate a node. The replicas field should be decremented by 1, even though
# the cluster state has not yet reached the goal state.
self.provider.terminate(ids=[pod_to_delete], request_id="term-1")
patches = self.mock_client.get_patches(
f"rayclusters/{self.provider._cluster_name}"
)
assert len(patches) == 2
assert patches == [
{
"op": "replace",
"path": "/spec/workerGroupSpecs/0/replicas",
"value": desired_replicas - 1,
},
{
"op": "replace",
"path": "/spec/workerGroupSpecs/0/scaleStrategy",
"value": {
"workersToDelete": [
pod_to_delete,
]
},
},
]


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
Expand Down
6 changes: 3 additions & 3 deletions python/ray/tests/kuberay/test_autoscaling_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _get_basic_autoscaling_config() -> dict:
},
"small-group": {
"max_workers": 300,
"min_workers": 1,
"min_workers": 0,
"node_config": {},
"resources": {
"CPU": 1,
Expand All @@ -95,7 +95,7 @@ def _get_basic_autoscaling_config() -> dict:
# and modified max_workers.
"gpu-group": {
"max_workers": 200,
"min_workers": 1,
"min_workers": 0,
"node_config": {},
"resources": {
"CPU": 1,
Expand All @@ -109,7 +109,7 @@ def _get_basic_autoscaling_config() -> dict:
# and modified max_workers and node_config.
"tpu-group": {
"max_workers": 4,
"min_workers": 1,
"min_workers": 0,
"node_config": {},
"resources": {
"CPU": 1,
Expand Down