Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spot] Show spot controller in sky status and simplify tearing down #1270

Merged
merged 13 commits into from
Oct 21, 2022
4 changes: 3 additions & 1 deletion sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@
# Note: This value cannot be too small, otherwise OOM issue may occur.
DEFAULT_TASK_CPU_DEMAND = 0.5

SKY_RESERVED_CLUSTER_NAMES = [spot_lib.SPOT_CONTROLLER_NAME]
SKY_RESERVED_CLUSTER_NAMES = {
spot_lib.SPOT_CONTROLLER_NAME: 'Managed spot controller'
}

# Filelocks for the cluster status change.
CLUSTER_STATUS_LOCK_PATH = os.path.expanduser('~/.sky/.{}.lock')
Expand Down
89 changes: 79 additions & 10 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
listed in "sky --help". Take care to put logically connected commands close to
each other.
"""
import collections
import datetime
import functools
import getpass
Expand Down Expand Up @@ -1279,10 +1280,32 @@ def status(all: bool, refresh: bool): # pylint: disable=redefined-builtin
'(down)', e.g. '1m (down)', the cluster will be autodowned, rather than
autostopped.
"""
cluster_records = core.status(all=all, refresh=refresh)
cluster_records = core.status(refresh=refresh)
nonreserved_cluster_records = []
reserved_clusters = collections.defaultdict(list)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to simplify this group based logic? Maybe specializing to just 1 spot controller?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Changed each group to only has 1 cluster.

for cluster_record in cluster_records:
cluster_name = cluster_record['name']
if cluster_name in backend_utils.SKY_RESERVED_CLUSTER_NAMES:
cluster_group_name = backend_utils.SKY_RESERVED_CLUSTER_NAMES[
cluster_name]
reserved_clusters[cluster_group_name].append(cluster_record)
else:
nonreserved_cluster_records.append(cluster_record)
local_clusters = onprem_utils.check_and_get_local_clusters(
suppress_error=True)
status_utils.show_status_table(cluster_records, all)

num_pending_autostop = 0
num_pending_autostop += status_utils.show_status_table(
nonreserved_cluster_records, all)
for cluster_group_name, cluster_records in reserved_clusters.items():
num_pending_autostop += status_utils.show_status_table(
cluster_records, all, reserved_group_name=cluster_group_name)
if num_pending_autostop > 0:
click.echo(
'\n'
f'{colorama.Style.DIM}You have {num_pending_autostop} clusters '
'with auto{stop,down} scheduled. Refresh statuses with: '
f'sky status --refresh{colorama.Style.RESET_ALL}')
status_utils.show_local_status_table(local_clusters)


Expand Down Expand Up @@ -1905,21 +1928,67 @@ def _down_or_stop_clusters(
# Make sure the reserved clusters are explicitly specified without other
# normal clusters and purge is True.
if len(reserved_clusters) > 0:
if not purge:
msg = (f'{operation} reserved cluster(s) '
f'{reserved_clusters_str} is not supported.')
if down:
msg += (
'\nPlease specify --purge (-p) to force-terminate the '
'reserved cluster(s).')
raise click.UsageError(msg)
if len(names) != 0:
names_str = ', '.join(map(repr, names))
raise click.UsageError(
f'{operation} reserved cluster(s) '
f'{reserved_clusters_str} with multiple other cluster(s) '
f'{names_str} is not supported.\n'
f'Please omit the reserved cluster(s) {reserved_clusters}.')
if not down:
raise click.UsageError(
f'{operation} reserved cluster(s) '
f'{reserved_clusters_str} is not supported.')
else:
# TODO(zhwu): We can only have one reserved cluster (spot
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
# controller).
assert len(reserved_clusters) == 1, reserved_clusters
# spot_jobs will be empty when the spot cluster is not running.
cluster_name = reserved_clusters[0]
cluster_status, _ = backend_utils.refresh_cluster_status_handle(
cluster_name)
if cluster_status is None:
click.echo(
'Managed spot controller has already been torn down.')
return

cnt = 1
msg = (
f'{colorama.Fore.YELLOW}WARNING: Tearing down the managed '
f'spot controller ({cluster_status.value}). Please be '
f'aware of the following:{colorama.Style.RESET_ALL}'
f'\n {cnt}. All logs and status information of the spot '
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
'jobs will be lost.')
cnt += 1
if cluster_status == global_user_state.ClusterStatus.INIT:
msg += (
f'\n {cnt}. Resource leakage may happen caused by '
'spot jobs being submitted, and in-progress spot jobs.')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If there are pending/in-progress spot jobs, those resources will not be terminated and require manual cleanup.

Actually, why do we show this when the controller is INIT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it is possible that another sky spot launch is running in parallel, which will make the cluster status INIT, i.e. during a spot job is being submitted to the spot controller.

cnt += 1
elif cluster_status == global_user_state.ClusterStatus.UP:
spot_jobs = core.spot_status(refresh=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if before this line, a concurrent spot launch made the controller INIT? Would this call fail? The docstr of this func doesn't suggest what to expect.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Added an error handling for it. PTAL.

non_terminal_jobs = [
job for job in spot_jobs
if not job['status'].is_terminal()
]
if non_terminal_jobs:
msg += (
f'\n {cnt}. Resource leakage may happen caused by '
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
'the following in-progress spot jobs:\n')
job_table = spot_lib.format_job_table(non_terminal_jobs,
show_all=False)
msg += '\n'.join([
' ' + line
for line in job_table.split('\n')
if line != ''
])
click.echo(msg)

click.confirm('Do you want to continue?',
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
default=False,
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
abort=True,
show_default=True)
no_confirm = True
names += reserved_clusters

if apply_to_all:
Expand Down
9 changes: 3 additions & 6 deletions sky/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@


@usage_lib.entrypoint
def status(all: bool, refresh: bool) -> List[Dict[str, Any]]:
def status(refresh: bool) -> List[Dict[str, Any]]:
"""Get the cluster status in dict.

Please refer to the sky.cli.status for the document.
Expand All @@ -52,7 +52,8 @@ def status(all: bool, refresh: bool) -> List[Dict[str, Any]]:
]

"""
cluster_records = backend_utils.get_clusters(all, refresh)
cluster_records = backend_utils.get_clusters(include_reserved=True,
refresh=refresh)
return cluster_records


Expand Down Expand Up @@ -167,10 +168,6 @@ def down(cluster_name: str, purge: bool = False):
ValueError: cluster does not exist.
sky.exceptions.NotSupportedError: the cluster is not supported.
"""
if (cluster_name in backend_utils.SKY_RESERVED_CLUSTER_NAMES and not purge):
raise exceptions.NotSupportedError(
f'Tearing down sky reserved cluster {cluster_name!r} '
f'is not supported.')
handle = global_user_state.get_handle_from_cluster_name(cluster_name)
if handle is None:
raise ValueError(f'Cluster {cluster_name!r} does not exist.')
Expand Down
2 changes: 1 addition & 1 deletion sky/spot/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Constants used for Managed Spot."""

SPOT_CONTROLLER_IDLE_MINUTES_TO_AUTOSTOP = 30
SPOT_CONTROLLER_IDLE_MINUTES_TO_AUTOSTOP = 10

SPOT_CONTROLLER_TEMPLATE = 'spot-controller.yaml.j2'
SPOT_CONTROLLER_YAML_PREFIX = '~/.sky/spot_controller'
Expand Down
27 changes: 19 additions & 8 deletions sky/utils/cli_utils/status_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Utilities for sky status."""
from typing import Any, Callable, Dict, List
from typing import Any, Callable, Dict, List, Optional
import click
import colorama

Expand Down Expand Up @@ -32,8 +32,14 @@ def calc(self, record):
return val


def show_status_table(cluster_records: List[Dict[str, Any]], show_all: bool):
"""Compute cluster table values and display."""
def show_status_table(cluster_records: List[Dict[str, Any]],
show_all: bool,
reserved_group_name: Optional[str] = None) -> int:
"""Compute cluster table values and display.

Returns:
Number of pending autostop clusters.
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
"""
# TODO(zhwu): Update the information for autostop clusters.

status_columns = [
Expand Down Expand Up @@ -68,14 +74,19 @@ def show_status_table(cluster_records: List[Dict[str, Any]], show_all: bool):
pending_autostop += _is_pending_autostop(record)

if cluster_records:
click.echo(cluster_table)
if pending_autostop:
if reserved_group_name is not None:
click.echo(
'\n'
f'You have {pending_autostop} clusters with auto{{stop,down}} '
'scheduled. Refresh statuses with: `sky status --refresh`.')
f'\n{colorama.Fore.CYAN}{colorama.Style.BRIGHT}'
f'{reserved_group_name}: {colorama.Style.RESET_ALL}'
f'{colorama.Style.DIM}(will be autostopped when inactive)'
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
f'{colorama.Style.RESET_ALL}')
else:
click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}Clusters: '
f'{colorama.Style.RESET_ALL}')
click.echo(cluster_table)
else:
click.echo('No existing clusters.')
return pending_autostop


def show_local_status_table(local_clusters: List[str]):
Expand Down
29 changes: 22 additions & 7 deletions tests/test_spot.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,29 @@ def _mock_cluster_state(self, _mock_db_conn):
ready=True)

@pytest.mark.timeout(60)
def test_down_spot_controller(self, _mock_cluster_state):
cli_runner = cli_testing.CliRunner()
def test_down_spot_controller(self, _mock_cluster_state, monkeypatch):

result = cli_runner.invoke(cli.down, [spot.SPOT_CONTROLLER_NAME])
assert result.exit_code == click.UsageError.exit_code
assert (
f'Terminating reserved cluster(s) \'{spot.SPOT_CONTROLLER_NAME}\' '
'is not supported' in result.output)
def mock_cluster_refresh_up(
cluster_name: str,
*,
force_refresh: bool = False,
acquire_per_cluster_status_lock: bool = True,
):
record = global_user_state.get_cluster_from_name(cluster_name)
return record['status'], record['handle']

monkeypatch.setattr(
'sky.backends.backend_utils.refresh_cluster_status_handle',
mock_cluster_refresh_up)

monkeypatch.setattr('sky.core.spot_status', lambda refresh: [])

cli_runner = cli_testing.CliRunner()
result = cli_runner.invoke(cli.down, [spot.SPOT_CONTROLLER_NAME],
input='n')
assert 'WARNING: Tearing down the managed spot controller (UP).' in result.output
assert isinstance(result.exception,
SystemExit), (result.exception, result.output)

result = cli_runner.invoke(cli.down, ['sky-spot-con*'])
assert not result.exception
Expand Down