Skip to content

Commit

Permalink
[AWS] Bring-your-own-VPC that disables public IPs for all SkyPilot no…
Browse files Browse the repository at this point in the history
…des. (#1512)

* Minor: sky logs hint

* Minor: add a FIXME in authentication.py.

* New module: sky_config

* Backend changes for SSH proxy command support.

* spot_launch(): sync up config; pop any proxy command.

* AutostopEvent: monkey patch SSHOptions.

* aws/config.py: support vpc_name and new use_internal_ips semantics.

* Make failover catch our 'ERROR' messages from AWS node provider.

* .j2 changes.

* Fix local launch hash for workers: must pop ssh_proxy_command.

* Fix pylint.

* typo

* smoke: make printf usage safe.

* Use SKYPILOT_ERROR as logging prefix.

* Fix Resources.__repr__().

* Avoid printing unnecessary termination errors for VPC-not-found.

* Fix a syntax error in codegen.

* Read from SKYPILOT_CONFIG env var to permit dynamic generation.

* Fix smoke test name.

* Fix another test name

* Revert "Read from SKYPILOT_CONFIG env var to permit dynamic generation."

This reverts commit 0b982cd.

* Fix head_node_launch_requested log line.

* Optional: env var to read configs for spot, for better isolation.

* Make query_head_ip_with_retries() robust to extra output.

* aws/config.py: reword comments

* events.py: restart_only=True

* Fix Resources.__repr__ to handle None fields.

* Use SKYPILOT_ERROR_NO_NODES_LAUNCHED

* rstrip() for ssh config entries.

* authentication.py: reword comment

* pylint

* Fix logging

* Try using reties for handle.{internal,external}_ips().

* Address some easy comments

* Typo

* backend_utils: fix worker IPs fetch; fix >80-col lines.

* Fix test_minimal.

* test_smoke: printf -> echo

* Query IPs once.

* Drop ssh_proxy_command in launch hash when provisioning.

* Typo

* Typo

* Add comment

* sky/sky_config.py -> sky/skypilot_config.py

* Add: sky/backends/monkey_patches/

* Remove logging

* pylint

* MANIFEST should add monkey patch file

* tests/test_smoke.py: fix extra \n

* Fix monkey patching bug.

* Remove AutostopEvent monkey patching.

* _ray_launch_hash: pop ssh proxy command for head & workers

* Make another 'ray up' use patched launch hash fn.

* Fix smoke tests.

* Fix smoke: K80 VMs could be non-ssh-able (and are more costly).
  • Loading branch information
concretevitamin authored Jan 4, 2023
1 parent 34e7fee commit 8d6f6a9
Show file tree
Hide file tree
Showing 17 changed files with 871 additions and 253 deletions.
2 changes: 1 addition & 1 deletion examples/job_queue/cluster_multinode.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@

resources:
cloud: aws
accelerators: K80
accelerators: T4

num_nodes: 2
2 changes: 1 addition & 1 deletion examples/job_queue/job_multinode.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
name: job_multinode

resources:
accelerators: K80:0.5
accelerators: T4:0.5

num_nodes: 2

Expand Down
7 changes: 6 additions & 1 deletion sky/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ def get_or_generate_keys() -> Tuple[str, str]:
_save_key_pair(private_key_path, public_key_path, private_key,
public_key)
else:
assert os.path.exists(public_key_path)
# FIXME(skypilot): ran into failing this assert once, but forgot the
# reproduction (has private key; but has not generated public key).
# AssertionError: /home/ubuntu/.ssh/sky-key.pub
assert os.path.exists(public_key_path), (
'Private key found, but associated public key '
f'{public_key_path} does not exist.')
return private_key_path, public_key_path


Expand Down
275 changes: 181 additions & 94 deletions sky/backends/backend_utils.py

Large diffs are not rendered by default.

243 changes: 173 additions & 70 deletions sky/backends/cloud_vm_ray_backend.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Runs `ray up` while not using ssh_proxy_command in launch hash.
This monkey patches the hash_launch_conf() function inside Ray autoscaler to
exclude any ssh_proxy_command in hash calculation.
Reasons:
- In the future, we want to support changing the ssh_proxy_command field for
an existing cluster. If the launch hash included this field, then this would
mean upon such a change a new cluster would've been launched, causing
leakage.
- With our patch, ssh_proxy_command will be excluded from the launch hash when
a cluster is first created. This then makes it possible for us to support
changing the proxy command in the future.
"""
import hashlib
import json
import os

from ray.autoscaler import sdk


# Ref: https://github.com/ray-project/ray/blob/releases/2.2.0/python/ray/autoscaler/_private/util.py#L392-L404
def monkey_patch_hash_launch_conf(node_conf, auth):
hasher = hashlib.sha1()
# For hashing, we replace the path to the key with the key
# itself. This is to make sure the hashes are the same even if keys
# live at different locations on different machines.
full_auth = auth.copy()
full_auth.pop('ssh_proxy_command', None) # NOTE: skypilot changes.
for key_type in ['ssh_private_key', 'ssh_public_key']:
if key_type in auth:
with open(os.path.expanduser(auth[key_type])) as key:
full_auth[key_type] = key.read()
hasher.update(
json.dumps([node_conf, full_auth], sort_keys=True).encode('utf-8'))
return hasher.hexdigest()


# Since hash_launch_conf is imported this way, we must patch this imported
# version.
sdk.sdk.commands.hash_launch_conf = monkey_patch_hash_launch_conf
sdk.create_or_update_cluster({ray_yaml_path}, **{ray_up_kwargs})
4 changes: 2 additions & 2 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1558,8 +1558,8 @@ def logs(

if len(job_ids) > 1 and not sync_down:
raise click.UsageError(
f'Cannot stream logs of multiple jobs {job_ids}. '
'Set --sync-down to download them.')
f'Cannot stream logs of multiple jobs (IDs: {", ".join(job_ids)}).'
'\nPass -s/--sync-down to download the logs instead.')

job_ids = None if not job_ids else job_ids

Expand Down
66 changes: 54 additions & 12 deletions sky/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from sky import exceptions
from sky import global_user_state
from sky import optimizer
from sky import skypilot_config
from sky import sky_logging
from sky import spot
from sky import task as task_lib
Expand Down Expand Up @@ -512,19 +513,60 @@ def spot_launch(
common_utils.dump_yaml(f.name, task_config)

controller_name = spot.SPOT_CONTROLLER_NAME
vars_to_fill = {
'remote_user_yaml_prefix': spot.SPOT_TASK_YAML_PREFIX,
'user_yaml_path': f.name,
'user_config_path': None,
'spot_controller': controller_name,
'cluster_name': name,
'gcloud_installation_commands': gcp.GCLOUD_INSTALLATION_COMMAND,
'is_dev': env_options.Options.IS_DEVELOPER.get(),
'disable_logging': env_options.Options.DISABLE_LOGGING.get(),
'logging_user_hash': common_utils.get_user_hash(),
'retry_until_up': retry_until_up,
'user': os.environ.get('USER', None),
}
if skypilot_config.loaded():
# Look up the contents of the already loaded configs via the
# 'skypilot_config' module. Don't simply read the on-disk file as
# it may have changed since this process started.
#
# Pop any proxy command, because the controller would've been
# launched behind the proxy, and in general any nodes we launch may
# not have or need the proxy setup. (If the controller needs to
# launch spot clusters in another region/VPC, the user should
# properly set up VPC peering, which will allow the
# cross-region/VPC communication. The proxy command is orthogonal
# to this scenario.)
#
# This file will be uploaded to the controller node and will be
# used throughout the spot job's recovery attempts (i.e., if it
# relaunches due to preemption, we make sure the same config is
# used).
#
# NOTE: suppose that we have a controller in old VPC, then user
# changes 'vpc_name' in the config and does a 'spot launch'. In
# general, the old controller may not successfully launch the job
# in the new VPC. This happens if the two VPCs don’t have peering
# set up. Like other places in the code, we assume properly setting
# up networking is user's responsibilities.
# TODO(zongheng): consider adding a basic check that checks
# controller VPC (or name) == the spot job's VPC (or name). It may
# not be a sufficient check (as it's always possible that peering
# is not set up), but it may catch some obvious errors.
config_dict = skypilot_config.pop_nested(
('auth', 'ssh_proxy_command'))
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmpfile:
common_utils.dump_yaml(tmpfile.name, config_dict)
vars_to_fill.update({
'user_config_path': tmpfile.name,
'env_var_skypilot_config':
skypilot_config.ENV_VAR_SKYPILOT_CONFIG,
})

yaml_path = backend_utils.fill_template(
spot.SPOT_CONTROLLER_TEMPLATE, {
'remote_user_yaml_prefix': spot.SPOT_TASK_YAML_PREFIX,
'user_yaml_path': f.name,
'spot_controller': controller_name,
'cluster_name': name,
'gcloud_installation_commands': gcp.GCLOUD_INSTALLATION_COMMAND,
'is_dev': env_options.Options.IS_DEVELOPER.get(),
'disable_logging': env_options.Options.DISABLE_LOGGING.get(),
'logging_user_hash': common_utils.get_user_hash(),
'retry_until_up': retry_until_up,
'user': os.environ.get('USER', None),
},
spot.SPOT_CONTROLLER_TEMPLATE,
vars_to_fill,
output_prefix=spot.SPOT_CONTROLLER_YAML_PREFIX)
controller_task = task_lib.Task.from_yaml(yaml_path)
controller_task.spot_task = task
Expand Down
43 changes: 41 additions & 2 deletions sky/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,29 @@ def __init__(
self._try_validate_image_id()

def __repr__(self) -> str:
"""Returns a string representation for display.
Examples:
>>> sky.Resources(accelerators='V100')
<Cloud>({'V100': 1})
>>> sky.Resources(accelerators='V100', use_spot=True)
<Cloud>([Spot], {'V100': 1})
>>> sky.Resources(accelerators='V100',
... use_spot=True, instance_type='p3.2xlarge')
AWS(p3.2xlarge[Spot], {'V100': 1})
>>> sky.Resources(accelerators='V100', instance_type='p3.2xlarge')
AWS(p3.2xlarge, {'V100': 1})
>>> sky.Resources(instance_type='p3.2xlarge')
AWS(p3.2xlarge, {'V100': 1})
>>> sky.Resources(disk_size=100)
<Cloud>(disk_size=100)
"""
accelerators = ''
accelerator_args = ''
if self.accelerators is not None:
Expand All @@ -123,8 +146,24 @@ def __repr__(self) -> str:
if self.disk_size != _DEFAULT_DISK_SIZE_GB:
disk_size = f', disk_size={self.disk_size}'

return (f'{self.cloud}({self._instance_type}{use_spot}'
f'{accelerators}{accelerator_args}{image_id}{disk_size})')
if self._instance_type is not None:
instance_type = f'{self._instance_type}'
else:
instance_type = ''

hardware_str = (
f'{instance_type}{use_spot}'
f'{accelerators}{accelerator_args}{image_id}{disk_size}')
# It may have leading ',' (for example, instance_type not set) or empty
# spaces. Remove them.
while hardware_str and hardware_str[0] in (',', ' '):
hardware_str = hardware_str[1:]

cloud_str = '<Cloud>'
if self.cloud is not None:
cloud_str = f'{self.cloud}'

return f'{cloud_str}({hardware_str})'

@property
def cloud(self):
Expand Down
1 change: 1 addition & 0 deletions sky/setup_files/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
include sky/backends/monkey_patches/*.py
include sky/skylet/*.sh
include sky/skylet/providers/aws/*
include sky/skylet/providers/aws/cloudwatch/*
Expand Down
18 changes: 18 additions & 0 deletions sky/skylet/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,24 @@ def _stop_cluster(self, autostop_config):
cloud_vm_ray_backend.CloudVmRayBackend.NAME):
self._replace_yaml_for_stopping(self._ray_yaml_path,
autostop_config.down)

# `ray up` is required to reset the upscaling speed and min/max
# workers. Otherwise, `ray down --workers-only` will continuously
# scale down and up.
logger.info('Running ray up.')
subprocess.run([
'ray', 'up', '-y', '--restart-only', '--disable-usage-stats',
self._ray_yaml_path
],
check=True)

logger.info('Running ray down.')
# Stop the workers first to avoid orphan workers.
subprocess.run(
['ray', 'down', '-y', '--workers-only', self._ray_yaml_path],
check=True)

logger.info('Running final ray down.')
subprocess.run(['ray', 'down', '-y', self._ray_yaml_path],
check=True)
else:
Expand All @@ -160,6 +166,18 @@ def _replace_yaml_for_stopping(self, yaml_path: str, down: bool):
config = yaml.safe_load(yaml_str)
# Set the private key with the existed key on the remote instance.
config['auth']['ssh_private_key'] = '~/ray_bootstrap_key.pem'
# NOTE: We must do this, otherwise with ssh_proxy_command still under
# 'auth:', `ray up ~/.sky/sky_ray.yaml` on the head node will fail (in
# general, the clusters do not need or have the proxy set up).
#
# Note also that this is ok only because in the local client ->
# provision head node code path, we have monkey patched
# hash_launch_conf() to exclude ssh_proxy_command from the hash
# calculation for the head node. Therefore when this current code is
# run again on the head, the hash would match the one at head's
# creation (otherwise the head node would be stopped and a new one
# would be launched).
config['auth'].pop('ssh_proxy_command', None)
# Empty the file_mounts.
config['file_mounts'] = dict()
common_utils.dump_yaml(yaml_path, config)
Expand Down
Loading

0 comments on commit 8d6f6a9

Please sign in to comment.