Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kuberay][autoscaler] Use new autoscaling fields from the KubeRay operator #25386

Merged
merged 35 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
d05c5ab
Update init-config and patch.
DmitriGekhtman Jun 1, 2022
78a7333
Remove rbac creation.
DmitriGekhtman Jun 1, 2022
7560b70
Fix patch.
DmitriGekhtman Jun 1, 2022
f72b2d6
Update autoscaler config.
DmitriGekhtman Jun 1, 2022
17b3fe9
Update default autoscaler image.
DmitriGekhtman Jun 1, 2022
16db7a6
Insert autoscaler options.
DmitriGekhtman Jun 1, 2022
b054024
Remove rbac from test.
DmitriGekhtman Jun 1, 2022
d6f744d
Allow autoscaler to trigger scale-down without maxReplicas change.
DmitriGekhtman Jun 1, 2022
4e050d8
Process autoscaler options.
DmitriGekhtman Jun 1, 2022
ac51170
Name keys.
DmitriGekhtman Jun 1, 2022
3e4feb0
Add a test.
DmitriGekhtman Jun 2, 2022
9821411
spec, not sepc
DmitriGekhtman Jun 2, 2022
fe10728
comment
DmitriGekhtman Jun 2, 2022
9099aa5
Create not apply.
DmitriGekhtman Jun 2, 2022
95aacee
Fix patch.
DmitriGekhtman Jun 2, 2022
7c50383
Fix config.
DmitriGekhtman Jun 2, 2022
11d8f21
lint
DmitriGekhtman Jun 2, 2022
d63ecbe
Remove service account.
DmitriGekhtman Jun 3, 2022
50975a3
Remove volume.
DmitriGekhtman Jun 3, 2022
1cc1fb7
Informative API failures.
DmitriGekhtman Jun 3, 2022
84b2e3e
Update commit.
DmitriGekhtman Jun 3, 2022
f5874dd
update-patch
DmitriGekhtman Jun 3, 2022
ee9547e
Merge branch 'master' into dmitri/align-repos
DmitriGekhtman Jun 3, 2022
821ebc1
Validate scale-down.
DmitriGekhtman Jun 3, 2022
6a93a60
Factor out wait-for-logs decorator.
DmitriGekhtman Jun 5, 2022
02d1c71
Remove RBAC removal.
DmitriGekhtman Jun 5, 2022
490d1c6
Fix arg. Doc string wording.
DmitriGekhtman Jun 5, 2022
93701fc
Reset stdout earlier.
DmitriGekhtman Jun 5, 2022
9942de4
Merge branch 'master' into dmitri/align-repos
DmitriGekhtman Jun 6, 2022
6d02b61
Provide time of last use.
DmitriGekhtman Jun 6, 2022
49ae388
Fix the test.
DmitriGekhtman Jun 7, 2022
6567d2e
Fix bug.
DmitriGekhtman Jun 7, 2022
f147632
remove breakpoint
DmitriGekhtman Jun 7, 2022
606c464
Update commit.
DmitriGekhtman Jun 9, 2022
eaf490b
Insert pull policy.
DmitriGekhtman Jun 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions doc/source/cluster/kuberay.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ Now you can deploy the KubeRay operator using

```shell
./ray/python/ray/autoscaler/kuberay/init-config.sh
kubectl apply -k "ray/python/ray/autoscaler/kuberay/config/default"
kubectl apply -f "ray/python/ray/autoscaler/kuberay/kuberay-autoscaler-rbac.yaml"
kubectl create -k "ray/python/ray/autoscaler/kuberay/config/default"
```

You can verify that the operator has been deployed using
Expand Down
48 changes: 48 additions & 0 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import functools
import io
import fnmatch
import os
Expand Down Expand Up @@ -397,6 +398,53 @@ async def async_wait_for_condition(
raise RuntimeError(message)


def wait_for_stdout(strings_to_match: List[str], timeout_s: int):
"""Returns a decorator which waits until the stdout emitted
by a function contains the provided list of strings.
Raises an exception if the stdout doesn't have the expected output in time.

Args:
strings_to_match: Wait until stdout contains all of these string.
timeout_s: Max time to wait, in seconds, before raising a RuntimeError.
"""

def decorator(func):
@functools.wraps(func)
def decorated_func(*args, **kwargs):
success = False
try:
# Redirect stdout to an in-memory stream.
out_stream = io.StringIO()
sys.stdout = out_stream
# Execute the func.
out = func(*args, **kwargs)
# Check out_stream once a second until the timeout.
# Raise a RuntimeError if we timeout.
wait_for_condition(
# Does redirected stdout contain all of the expected strings?
lambda: all(
string in out_stream.getvalue() for string in strings_to_match
),
timeout=timeout_s,
retry_interval_ms=1000,
)
# out_stream has the expected strings
success = True
return out
finally:
sys.stdout = sys.__stdout__
if success:
print("Confirmed expected function stdout. Stdout follows:")
else:
print("Did not confirm expected function stdout. Stdout follows:")
print(out_stream.getvalue())
out_stream.close()

return decorated_func

return decorator


def wait_until_succeeded_without_exception(
func, exceptions, *args, timeout_ms=1000, retry_interval_ms=100, raise_last_ex=False
):
Expand Down
6 changes: 6 additions & 0 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,12 @@ def keep_node(node_id: NodeID) -> None:
node_ip = self.provider.internal_ip(node_id)
if node_ip in last_used and last_used[node_ip] < horizon:
self.schedule_node_termination(node_id, "idle", logger.info)
# Get the local time of the node's last use as a string.
formatted_last_used_time = time.asctime(
time.localtime(last_used[node_ip])
)
logger.info(f"Node last used: {formatted_last_used_time}.")
# Note that the current time will appear in the log prefix.
elif not self.launch_config_ok(node_id):
self.schedule_node_termination(node_id, "outdated", logger.info)
else:
Expand Down
22 changes: 19 additions & 3 deletions python/ray/autoscaler/_private/kuberay/autoscaling_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

logger = logging.getLogger(__name__)

AUTOSCALER_OPTIONS_KEY = "autoscalerOptions"
IDLE_SECONDS_KEY = "idleTimeoutSeconds"
UPSCALING_KEY = "upscalingMode"
UPSCALING_VALUE_AGGRESSIVE = "Aggressive"

# Logical group name for the KubeRay head group.
# Used as the name of the "head node type" by the autoscaler.
Expand Down Expand Up @@ -54,7 +58,8 @@ def _fetch_ray_cr_from_k8s(self) -> Dict[str, Any]:
result = requests.get(
self._ray_cr_url, headers=self._headers, verify=self._verify
)
assert result.status_code == 200
if not result.status_code == 200:
result.raise_for_status()
ray_cr = result.json()
return ray_cr

Expand All @@ -75,6 +80,17 @@ def _derive_autoscaling_config_from_ray_cr(ray_cr: Dict[str, Any]) -> Dict[str,
# Legacy autoscaling fields carry no information but are required for compatibility.
legacy_autoscaling_fields = _generate_legacy_autoscaling_config_fields()

# Process autoscaler options.
autoscaler_options = ray_cr["spec"].get(AUTOSCALER_OPTIONS_KEY, {})
if IDLE_SECONDS_KEY in autoscaler_options:
idle_timeout_minutes = autoscaler_options[IDLE_SECONDS_KEY] / 60.0
else:
idle_timeout_minutes = 5.0
if autoscaler_options.get(UPSCALING_KEY) == UPSCALING_VALUE_AGGRESSIVE:
upscaling_speed = 1000 # i.e. big
DmitriGekhtman marked this conversation as resolved.
Show resolved Hide resolved
else:
upscaling_speed = 1

autoscaling_config = {
"provider": provider_config,
"cluster_name": ray_cr["metadata"]["name"],
Expand All @@ -83,10 +99,10 @@ def _derive_autoscaling_config_from_ray_cr(ray_cr: Dict[str, Any]) -> Dict[str,
"max_workers": global_max_workers,
# Should consider exposing `idleTimeoutMinutes` in the RayCluster CRD,
# under an `autoscaling` field.
"idle_timeout_minutes": 5,
"idle_timeout_minutes": idle_timeout_minutes,
# Should consider exposing `upscalingSpeed` in the RayCluster CRD,
# under an `autoscaling` field.
"upscaling_speed": 1,
"upscaling_speed": upscaling_speed,
**legacy_autoscaling_fields,
}

Expand Down
6 changes: 4 additions & 2 deletions python/ray/autoscaler/_private/kuberay/node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ def _get(self, path: str) -> Dict[str, Any]:
"""Wrapper for REST GET of resource with proper headers."""
url = url_from_resource(namespace=self.namespace, path=path)
result = requests.get(url, headers=self.headers, verify=self.verify)
assert result.status_code == 200
if not result.status_code == 200:
result.raise_for_status()
return result.json()

def _get_non_terminating_pods(
Expand Down Expand Up @@ -243,7 +244,8 @@ def _patch(self, path: str, payload: List[Dict[str, Any]]) -> Dict[str, Any]:
headers={**self.headers, "Content-type": "application/json-patch+json"},
verify=self.verify,
)
assert result.status_code == 200
if not result.status_code == 200:
result.raise_for_status()
return result.json()

def create_node(
Expand Down
2 changes: 1 addition & 1 deletion python/ray/autoscaler/kuberay/init-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pushd "$DIR" || exit
# get updated together. It is important to keep this in mind when making
# changes. The CRD is designed to be stable so one operator can run many
# different versions of Ray.
git checkout a46ba3f2946f9881d0ee9797f1c9d58d1df4eff4
git checkout 69ecfceef5c966193ab87f22a9f49250b17e35fb
# Here is where we specify the docker image that is used for the operator.
# If you want to use your own version of Kuberay, you should change the content
# of kuberay-autoscaler.patch to point to your operator.
Expand Down
11 changes: 6 additions & 5 deletions python/ray/autoscaler/kuberay/kuberay-autoscaler.patch
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
diff --git a/ray-operator/config/default/kustomization.yaml b/ray-operator/config/default/kustomization.yaml
DmitriGekhtman marked this conversation as resolved.
Show resolved Hide resolved
index 7df72cd..b67c047 100644
index 7df72cd..9db211a 100644
--- a/ray-operator/config/default/kustomization.yaml
+++ b/ray-operator/config/default/kustomization.yaml
@@ -23,5 +23,5 @@ bases:
images:
- name: kuberay/operator
newName: kuberay/operator
- newTag: nightly
+ newTag: a46ba3f
+ newTag: 69ecfce

diff --git a/ray-operator/config/manager/manager.yaml b/ray-operator/config/manager/manager.yaml
index ef5318f..b5f32bf 100644
index 1ddde41..274bfc9 100644
--- a/ray-operator/config/manager/manager.yaml
+++ b/ray-operator/config/manager/manager.yaml
@@ -21,8 +21,8 @@ spec:
Expand All @@ -22,5 +22,6 @@ index ef5318f..b5f32bf 100644
+ args:
+ - --prioritize-workers-to-delete
image: kuberay/operator
name: ray-manager
securityContext:
ports:
- name: http

55 changes: 5 additions & 50 deletions python/ray/autoscaler/kuberay/ray-cluster.complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ metadata:
# An unique identifier for the head node and workers of this cluster.
name: raycluster-complete
spec:
rayVersion: '1.12.0'
enableInTreeAutoscaling: false
rayVersion: '1.12.1'
# With enableInTreeAutoscaling: true, the operator will insert an autoscaler sidecar container into the Ray head pod.
enableInTreeAutoscaling: true
######################headGroupSpecs#################################
# head group template and specs, (perhaps 'group' is not needed in the name)
headGroupSpec:
Expand Down Expand Up @@ -52,14 +53,11 @@ spec:
annotations:
key: value
spec:
# This is needed to give the autoscaler side car permissions to query and update
# definitions in the Kubernetes API server (see kuberay-autoscaler-rbac.yaml)
serviceAccountName: autoscaler-sa
containers:
# The Ray head pod
- name: ray-head
# All Ray pods in the RayCluster should use the same version of Ray.
image: rayproject/ray:1.12.0
image: rayproject/ray:1.12.1
imagePullPolicy: Always
# The KubeRay operator uses the ports specified on the ray-head container
# to configure a service targeting the ports.
Expand Down Expand Up @@ -107,49 +105,6 @@ spec:
requests:
cpu: "500m"
memory: "512Mi"
volumeMounts:
- mountPath: /tmp/ray
name: ray-logs
# The Ray autoscaler sidecar to the head pod
DmitriGekhtman marked this conversation as resolved.
Show resolved Hide resolved
- name: autoscaler
# The autoscaler image used carries the latest improvements to KubeRay autoscaling
# support.
# It is confirmed (via kuberay/test_autoscaling_e2e.py) to be compatible with all
# Ray versions since Ray 1.11.0.
# TODO: Use released Ray version when autoscaling support is stable.
image: rayproject/ray:c6d3ff
imagePullPolicy: Always
env:
- name: RAY_CLUSTER_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: RAY_CLUSTER_NAME
# This value must match the metadata.name of the RayCluster CR.
# The autoscaler uses this env variable to determine which Ray CR to interact with.
# TODO: Match with CR name automatically via operator, Helm, and/or Kustomize.
value: raycluster-complete
command: ["ray"]
args:
- "kuberay-autoscaler"
- "--cluster-name"
- "$(RAY_CLUSTER_NAME)"
- "--cluster-namespace"
- "$(RAY_CLUSTER_NAMESPACE)"
resources:
limits:
cpu: 500m
memory: 1024Mi
requests:
cpu: 250m
memory: 512Mi
volumeMounts:
- mountPath: /tmp/ray
name: ray-logs
volumes:
# You set volumes at the Pod level, then mount them into containers inside that Pod
- name: ray-logs
emptyDir: {}
workerGroupSpecs:
# the pod replicas in this group typed worker
- replicas: 1
Expand Down Expand Up @@ -193,7 +148,7 @@ spec:
containers:
- name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc'
# All Ray pods in the RayCluster should use the same version of Ray.
image: rayproject/ray:1.12.0
image: rayproject/ray:1.12.1
# environment variables to set in the container.Optional.
# Refer to https://kubernetes.io/docs/tasks/inject-data-application/define-environment-variable-container/
env:
Expand Down
14 changes: 13 additions & 1 deletion python/ray/tests/kuberay/scripts/scale_down.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
import ray
from ray._private import test_utils

SCALE_DOWN_GPU = "Removing 1 nodes of type fake-gpu-group (idle)."


@test_utils.wait_for_stdout(strings_to_match=[SCALE_DOWN_GPU], timeout_s=25)
def main():
"""Removes CPU request, removes GPU actor."""
"""Removes CPU request, removes GPU actor.
Waits for autoscaler scale-down events to get emitted to stdout.

DmitriGekhtman marked this conversation as resolved.
Show resolved Hide resolved
The worker idle timeout is set to 10 seconds and the autoscaler's update interval is
5 seconds, so it should be enough to wait 15 seconds.
An extra ten seconds are added to the timeout as a generous buffer against
flakiness.
"""
# Remove resource demands
ray.autoscaler.sdk.request_resources(num_cpus=0)
gpu_actor = ray.get_actor("gpu_actor")
ray.kill(gpu_actor)
Expand Down
46 changes: 8 additions & 38 deletions python/ray/tests/kuberay/scripts/scale_up.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,20 @@
import ray
from ray._private import test_utils
import io
import sys

SCALE_MESSAGE = "Adding 1 nodes of type small-group."


@test_utils.wait_for_stdout(
strings_to_match=["Adding 1 nodes of type small-group."], timeout_s=15
)
def main():
"""Submits CPU request.
Waits for autoscaler scale-up event to get emitted to stdout."""
# Redirect stdout to a StringIO object.
out_stream = io.StringIO()
sys.stdout = out_stream
Wait 15 sec for autoscaler scale-up event to get emitted to stdout.

# Make the scale request
The autoscaler update interval is 5 sec, so it should be enough to wait 5 seconds.
An extra ten seconds are added to the timeout as a generous buffer against
flakiness.
"""
ray.autoscaler.sdk.request_resources(num_cpus=2)

def scale_event_logged() -> bool:
"""Return True if the scale up event message is in the out_stream.
Otherwise, raise an Exception.
"""
stdout_so_far = out_stream.getvalue()
if SCALE_MESSAGE in out_stream.getvalue():
return True
else:
# This error will be logged to stderr once if SCALE_MESSAGE doesn't
# appear in STDOUT in time.
raise RuntimeError(
"Expected autoscaler event not detected. Driver stdout follows:\n"
+ stdout_so_far
)

# Wait for the expected autoscaler event message to appear.
# It should take at most (5 + epsilon) seconds for this to happen.
# To prevent flakiness, use a large timeout of 15 seconds.
test_utils.wait_for_condition(
condition_predictor=scale_event_logged, timeout=15, retry_interval_ms=1500
)

# Reset stdout and print driver logs.
sys.stdout = sys.__stdout__
print("Expected autoscaler event detected. Driver stdout follows:")
print(out_stream.getvalue())

out_stream.close()


if __name__ == "__main__":
ray.init("auto")
Expand Down
24 changes: 24 additions & 0 deletions python/ray/tests/kuberay/test_autoscaling_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,22 @@ def _get_gpu_complaint() -> str:
)


def _get_ray_cr_with_autoscaler_options() -> dict:
cr = _get_basic_ray_cr()
cr["spec"]["autoscalerOptions"] = {
"upscalingMode": "Aggressive",
"idleTimeoutSeconds": 60,
}
return cr


def _get_autoscaling_config_with_options() -> dict:
config = _get_basic_autoscaling_config()
config["upscaling_speed"] = 1000
config["idle_timeout_minutes"] = 1.0
return config


PARAM_ARGS = ",".join(
[
"ray_cr_in",
Expand Down Expand Up @@ -179,6 +195,14 @@ def _get_gpu_complaint() -> str:
_get_gpu_complaint(),
id="gpu-complaint",
),
pytest.param(
_get_ray_cr_with_autoscaler_options(),
_get_autoscaling_config_with_options(),
None,
None,
None,
id="autoscaler-options",
),
]
)

Expand Down
Loading