Skip to content

Commit

Permalink
[Spot] Show spot controller in sky status and simplify tearing down (s…
Browse files Browse the repository at this point in the history
…kypilot-org#1270)

* Show controller in status and enable tear down.

* Reduce the autostop for controller to save cost

* fix test

* format

* address comments

* fix down output

* fix test

* address comments

* Fix the docs for spot job

* fix space

* address comments
  • Loading branch information
Michaelvll authored and ewzeng committed Oct 24, 2022
1 parent 09b7720 commit 1dc2515
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 44 deletions.
8 changes: 4 additions & 4 deletions docs/source/examples/spot-jobs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,13 @@ Spot controller (Advanced)
-------------------------------

There will be a single spot controller VM (a small on-demand CPU VM) running in the background to manage all the spot jobs.
It will be autostopped after all spot jobs finished and no new spot job is submitted for 30 minutes. Typically **no user intervention** is needed.
You can find the controller with :code:`sky status -a`, and refresh the status with :code:`sky status -ar`.
It will be autostopped after all spot jobs finished and no new spot job is submitted for 10 minutes. Typically **no user intervention** is needed.
You can find the controller with :code:`sky status`, and refresh the status with :code:`sky status -r`.

Although, the cost of the spot controller is negligible (~$0.4/hour when running and less than $0.004/hour when stopped),
you can still tear it down manually with
:code:`sky down -p sky-spot-controller-<hash>`, where the ``<hash>`` can be found in the output of :code:`sky status -a`.
:code:`sky down <spot-controller-name>`, where the ``<spot-controller-name>`` can be found in the output of :code:`sky status`.

.. note::
Tearing down the spot controller when there are still spot jobs running will cause resource leakage of those spot VMs.
Tearing down the spot controller will lose all logs and status information for the spot jobs and can cause resource leakage when there are still in-progress spot jobs.

16 changes: 9 additions & 7 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@
# Note: This value cannot be too small, otherwise OOM issue may occur.
DEFAULT_TASK_CPU_DEMAND = 0.5

SKY_RESERVED_CLUSTER_NAMES = [spot_lib.SPOT_CONTROLLER_NAME]
# Mapping from reserved cluster names to the corresponding group name (logging purpose).
# NOTE: each group can only have one reserved cluster name for now.
SKY_RESERVED_CLUSTER_NAMES = {
spot_lib.SPOT_CONTROLLER_NAME: 'Managed spot controller'
}

# Filelocks for the cluster status change.
CLUSTER_STATUS_LOCK_PATH = os.path.expanduser('~/.sky/.{}.lock')
Expand Down Expand Up @@ -1927,13 +1931,11 @@ def check_cluster_name_not_reserved(
If the cluster name is reserved, return the error message. Otherwise,
return None.
"""
usage = 'internal use'
if cluster_name == spot_lib.SPOT_CONTROLLER_NAME:
usage = 'spot controller'
msg = f'Cluster {cluster_name!r} is reserved for {usage}.'
if operation_str is not None:
msg += f' {operation_str} is not allowed.'
if cluster_name in SKY_RESERVED_CLUSTER_NAMES:
msg = (f'Cluster {cluster_name!r} is reserved for '
f'{SKY_RESERVED_CLUSTER_NAMES[cluster_name].lower()}.')
if operation_str is not None:
msg += f' {operation_str} is not allowed.'
with ux_utils.print_exception_no_traceback():
raise ValueError(msg)

Expand Down
95 changes: 84 additions & 11 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1279,10 +1279,32 @@ def status(all: bool, refresh: bool): # pylint: disable=redefined-builtin
'(down)', e.g. '1m (down)', the cluster will be autodowned, rather than
autostopped.
"""
cluster_records = core.status(all=all, refresh=refresh)
cluster_records = core.status(refresh=refresh)
nonreserved_cluster_records = []
reserved_clusters = dict()
for cluster_record in cluster_records:
cluster_name = cluster_record['name']
if cluster_name in backend_utils.SKY_RESERVED_CLUSTER_NAMES:
cluster_group_name = backend_utils.SKY_RESERVED_CLUSTER_NAMES[
cluster_name]
reserved_clusters[cluster_group_name] = cluster_record
else:
nonreserved_cluster_records.append(cluster_record)
local_clusters = onprem_utils.check_and_get_local_clusters(
suppress_error=True)
status_utils.show_status_table(cluster_records, all)

num_pending_autostop = 0
num_pending_autostop += status_utils.show_status_table(
nonreserved_cluster_records, all)
for cluster_group_name, cluster_record in reserved_clusters.items():
num_pending_autostop += status_utils.show_status_table(
[cluster_record], all, reserved_group_name=cluster_group_name)
if num_pending_autostop > 0:
click.echo(
'\n'
f'{colorama.Style.DIM}You have {num_pending_autostop} clusters '
'with auto{stop,down} scheduled. Refresh statuses with: '
f'sky status --refresh{colorama.Style.RESET_ALL}')
status_utils.show_local_status_table(local_clusters)


Expand Down Expand Up @@ -1853,6 +1875,50 @@ def down(
purge=purge)


def _hint_for_down_spot_controller(controller_name: str):
# spot_jobs will be empty when the spot cluster is not running.
cluster_status, _ = backend_utils.refresh_cluster_status_handle(
controller_name)
if cluster_status is None:
click.echo('Managed spot controller has already been torn down.')
return

msg = (f'{colorama.Fore.YELLOW}WARNING: Tearing down the managed '
f'spot controller ({cluster_status.value}). Please be '
f'aware of the following:{colorama.Style.RESET_ALL}'
'\n * All logs and status information of the spot '
'jobs (output of sky spot status) will be lost.')
if cluster_status == global_user_state.ClusterStatus.UP:
try:
spot_jobs = core.spot_status(refresh=False)
except exceptions.ClusterNotUpError:
# The spot controller cluster status changed during querying
# the spot jobs, use the latest cluster status, so that the
# message for INIT and STOPPED states will be correctly
# added to the message.
cluster_status = backend_utils.refresh_cluster_status_handle(
controller_name)
spot_jobs = []

# Find in-progress spot jobs, and hint users to cancel them.
non_terminal_jobs = [
job for job in spot_jobs if not job['status'].is_terminal()
]
if (cluster_status == global_user_state.ClusterStatus.UP and
non_terminal_jobs):
msg += ('\n * In-progress spot jobs found, their resources '
'will not be terminated and require manual cleanup:\n')
job_table = spot_lib.format_job_table(non_terminal_jobs,
show_all=False)
# Add prefix to each line to align with the bullet point.
msg += '\n'.join(
[' ' + line for line in job_table.split('\n') if line != ''])
if cluster_status == global_user_state.ClusterStatus.INIT:
msg += ('\n * If there are pending/in-progress spot jobs, those '
'resources will not be terminated and require manual cleanup.')
click.echo(msg)


def _down_or_stop_clusters(
names: Tuple[str],
apply_to_all: Optional[bool],
Expand Down Expand Up @@ -1903,23 +1969,30 @@ def _down_or_stop_clusters(
'`sky stop/autostop`.'))
]
# Make sure the reserved clusters are explicitly specified without other
# normal clusters and purge is True.
# normal clusters.
if len(reserved_clusters) > 0:
if not purge:
msg = (f'{operation} reserved cluster(s) '
f'{reserved_clusters_str} is not supported.')
if down:
msg += (
'\nPlease specify --purge (-p) to force-terminate the '
'reserved cluster(s).')
raise click.UsageError(msg)
if len(names) != 0:
names_str = ', '.join(map(repr, names))
raise click.UsageError(
f'{operation} reserved cluster(s) '
f'{reserved_clusters_str} with multiple other cluster(s) '
f'{names_str} is not supported.\n'
f'Please omit the reserved cluster(s) {reserved_clusters}.')
if not down:
raise click.UsageError(
f'{operation} reserved cluster(s) '
f'{reserved_clusters_str} is not supported.')
else:
# TODO(zhwu): We can only have one reserved cluster (spot
# controller).
assert len(reserved_clusters) == 1, reserved_clusters
_hint_for_down_spot_controller(reserved_clusters[0])

click.confirm('Proceed?',
default=False,
abort=True,
show_default=True)
no_confirm = True
names += reserved_clusters

if apply_to_all:
Expand Down
9 changes: 3 additions & 6 deletions sky/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@


@usage_lib.entrypoint
def status(all: bool, refresh: bool) -> List[Dict[str, Any]]:
def status(refresh: bool) -> List[Dict[str, Any]]:
"""Get the cluster status in dict.
Please refer to the sky.cli.status for the document.
Expand All @@ -52,7 +52,8 @@ def status(all: bool, refresh: bool) -> List[Dict[str, Any]]:
]
"""
cluster_records = backend_utils.get_clusters(all, refresh)
cluster_records = backend_utils.get_clusters(include_reserved=True,
refresh=refresh)
return cluster_records


Expand Down Expand Up @@ -167,10 +168,6 @@ def down(cluster_name: str, purge: bool = False):
ValueError: cluster does not exist.
sky.exceptions.NotSupportedError: the cluster is not supported.
"""
if (cluster_name in backend_utils.SKY_RESERVED_CLUSTER_NAMES and not purge):
raise exceptions.NotSupportedError(
f'Tearing down sky reserved cluster {cluster_name!r} '
f'is not supported.')
handle = global_user_state.get_handle_from_cluster_name(cluster_name)
if handle is None:
raise ValueError(f'Cluster {cluster_name!r} does not exist.')
Expand Down
2 changes: 1 addition & 1 deletion sky/spot/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Constants used for Managed Spot."""

SPOT_CONTROLLER_IDLE_MINUTES_TO_AUTOSTOP = 30
SPOT_CONTROLLER_IDLE_MINUTES_TO_AUTOSTOP = 10

SPOT_CONTROLLER_TEMPLATE = 'spot-controller.yaml.j2'
SPOT_CONTROLLER_YAML_PREFIX = '~/.sky/spot_controller'
Expand Down
27 changes: 19 additions & 8 deletions sky/utils/cli_utils/status_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Utilities for sky status."""
from typing import Any, Callable, Dict, List
from typing import Any, Callable, Dict, List, Optional
import click
import colorama

Expand Down Expand Up @@ -32,8 +32,14 @@ def calc(self, record):
return val


def show_status_table(cluster_records: List[Dict[str, Any]], show_all: bool):
"""Compute cluster table values and display."""
def show_status_table(cluster_records: List[Dict[str, Any]],
show_all: bool,
reserved_group_name: Optional[str] = None) -> int:
"""Compute cluster table values and display.
Returns:
Number of pending auto{stop,down} clusters.
"""
# TODO(zhwu): Update the information for autostop clusters.

status_columns = [
Expand Down Expand Up @@ -68,14 +74,19 @@ def show_status_table(cluster_records: List[Dict[str, Any]], show_all: bool):
pending_autostop += _is_pending_autostop(record)

if cluster_records:
click.echo(cluster_table)
if pending_autostop:
if reserved_group_name is not None:
click.echo(
'\n'
f'You have {pending_autostop} clusters with auto{{stop,down}} '
'scheduled. Refresh statuses with: `sky status --refresh`.')
f'\n{colorama.Fore.CYAN}{colorama.Style.BRIGHT}'
f'{reserved_group_name}: {colorama.Style.RESET_ALL}'
f'{colorama.Style.DIM}(will be autostopped if idle for 30min)'
f'{colorama.Style.RESET_ALL}')
else:
click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}Clusters: '
f'{colorama.Style.RESET_ALL}')
click.echo(cluster_table)
else:
click.echo('No existing clusters.')
return pending_autostop


def show_local_status_table(local_clusters: List[str]):
Expand Down
29 changes: 22 additions & 7 deletions tests/test_spot.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,29 @@ def _mock_cluster_state(self, _mock_db_conn):
ready=True)

@pytest.mark.timeout(60)
def test_down_spot_controller(self, _mock_cluster_state):
cli_runner = cli_testing.CliRunner()
def test_down_spot_controller(self, _mock_cluster_state, monkeypatch):

result = cli_runner.invoke(cli.down, [spot.SPOT_CONTROLLER_NAME])
assert result.exit_code == click.UsageError.exit_code
assert (
f'Terminating reserved cluster(s) \'{spot.SPOT_CONTROLLER_NAME}\' '
'is not supported' in result.output)
def mock_cluster_refresh_up(
cluster_name: str,
*,
force_refresh: bool = False,
acquire_per_cluster_status_lock: bool = True,
):
record = global_user_state.get_cluster_from_name(cluster_name)
return record['status'], record['handle']

monkeypatch.setattr(
'sky.backends.backend_utils.refresh_cluster_status_handle',
mock_cluster_refresh_up)

monkeypatch.setattr('sky.core.spot_status', lambda refresh: [])

cli_runner = cli_testing.CliRunner()
result = cli_runner.invoke(cli.down, [spot.SPOT_CONTROLLER_NAME],
input='n')
assert 'WARNING: Tearing down the managed spot controller (UP).' in result.output
assert isinstance(result.exception,
SystemExit), (result.exception, result.output)

result = cli_runner.invoke(cli.down, ['sky-spot-con*'])
assert not result.exception
Expand Down

0 comments on commit 1dc2515

Please sign in to comment.