Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backward Compatibility][Spot] Avoid cluster leakage by ray yaml overwritten and reduce spot controller cost on AWS #1235

Merged
merged 10 commits into from
Oct 14, 2022
54 changes: 50 additions & 4 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from ray.autoscaler._private import util as ray_util
import rich.console as rich_console
import rich.progress as rich_progress
import yaml

import sky
from sky import authentication as auth
Expand Down Expand Up @@ -109,12 +110,22 @@
# Remote dir that holds our runtime files.
_REMOTE_RUNTIME_FILES_DIR = '~/.sky/.runtime_files'

_RAY_YAML_KEYS_TO_RESTORE_FOR_BACK_COMPATIBILITY = {'Ebs'}
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved


def is_ip(s: str) -> bool:
"""Returns whether this string matches IP_ADDR_REGEX."""
return len(re.findall(IP_ADDR_REGEX, s)) == 1


def _get_yaml_path_from_cluster_name(cluster_name: str,
prefix: str = SKY_USER_FILE_PATH) -> str:
output_path = pathlib.Path(
prefix).expanduser().resolve() / f'{cluster_name}.yml'
os.makedirs(output_path.parents[0], exist_ok=True)
return str(output_path)


def fill_template(template_name: str,
variables: Dict,
output_path: Optional[str] = None,
Expand All @@ -129,10 +140,8 @@ def fill_template(template_name: str,
if output_path is None:
assert ('cluster_name' in variables), ('cluster_name is required.')
cluster_name = variables.get('cluster_name')
output_path = pathlib.Path(
output_prefix).expanduser() / f'{cluster_name}.yml'
os.makedirs(output_path.parents[0], exist_ok=True)
output_path = str(output_path)
output_path = _get_yaml_path_from_cluster_name(cluster_name,
output_prefix)
output_path = os.path.abspath(output_path)

# Add yaml file path to the template variables.
Expand Down Expand Up @@ -655,6 +664,27 @@ def _remove_multinode_config(
break


def _restore_original_block_for_yaml(new_yaml: str, old_yaml: str,
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
key_names: Set[str]) -> str:
"""Restore the original block with key_name recursively."""
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved

def _restore_block(new_block: Dict[str, Any], old_block: Dict[str, Any]):
for key, value in new_block.items():
if key in key_names:
if key in old_block:
new_block[key] = old_block[key]
else:
del new_block[key]
elif isinstance(value, dict):
if key in old_block:
_restore_block(value, old_block[key])

new_config = yaml.safe_load(new_yaml)
old_config = yaml.safe_load(old_yaml)
_restore_block(new_config, old_config)
return common_utils.dump_yaml_str(new_config)


# TODO: too many things happening here - leaky abstraction. Refactor.
@timeline.event
def write_cluster_config(to_provision: 'resources.Resources',
Expand Down Expand Up @@ -700,6 +730,12 @@ def write_cluster_config(to_provision: 'resources.Resources',
auth_config = onprem_utils.get_local_auth_config(cluster_name)
region_name = resources_vars.get('region')

yaml_path = _get_yaml_path_from_cluster_name(cluster_name)
old_yaml_content = None
if os.path.exists(yaml_path):
with open(yaml_path, 'r') as f:
old_yaml_content = f.read()

yaml_path = fill_template(
cluster_config_template,
dict(
Expand Down Expand Up @@ -750,6 +786,16 @@ def write_cluster_config(to_provision: 'resources.Resources',
# been fully tested yet.
_optimize_file_mounts(yaml_path)

# Restore the old yaml content for backward compatibility.
if old_yaml_content is not None:
with open(yaml_path, 'r') as f:
new_yaml_content = f.read()
restored_yaml_content = _restore_original_block_for_yaml(
old_yaml_content, new_yaml_content,
_RAY_YAML_KEYS_TO_RESTORE_FOR_BACK_COMPATIBILITY)
with open(yaml_path, 'w') as f:
f.write(restored_yaml_content)

usage_lib.messages.usage.update_ray_yaml(yaml_path)
# For TPU nodes. TPU VMs do not need TPU_NAME.
if (resources_vars.get('tpu_type') is not None and
Expand Down
8 changes: 8 additions & 0 deletions sky/templates/aws-ray.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ available_node_types:
node_config:
InstanceType: {{instance_type}}
ImageId: {{image_id}} # Deep Learning AMI (Ubuntu 18.04); see aws.py.
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: {{disk_size}}
# use default Iops for gp3
VolumeType: gp3
Iops: 3000
Throughput: 125
{% if use_spot %}
InstanceMarketOptions:
MarketType: spot
Expand All @@ -49,6 +54,9 @@ available_node_types:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: {{disk_size}}
VolumeType: gp3
Iops: 3000
Throughput: 125
{% if use_spot %}
InstanceMarketOptions:
MarketType: spot
Expand Down
1 change: 1 addition & 0 deletions sky/utils/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import time
import uuid
from typing import Dict, List, Union

import yaml

from sky import sky_logging
Expand Down