Skip to content

Commit

Permalink
[autostop] Support restarting the autostop timer. (skypilot-org#1458)
Browse files Browse the repository at this point in the history
* [autostop] Support restarting the autostop timer.

* Logging

* Make each job submission call set_active_time_to_now().

* Fix test and pylint.

* Fix comments.

* Change tests; some fixes

* logging remnant

* remnant
  • Loading branch information
concretevitamin authored and Sumanth committed Jan 15, 2023
1 parent 74ddf76 commit ab9a802
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 39 deletions.
17 changes: 14 additions & 3 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def add_prologue(self,
import ray
import ray.util as ray_util
from sky.skylet import autostop_lib
from sky.skylet import constants
from sky.skylet import job_lib
from sky.utils import log_utils
Expand All @@ -202,8 +203,18 @@ def add_prologue(self,
inspect.getsource(log_lib.add_ray_env_vars),
inspect.getsource(log_lib.run_bash_command_with_log),
'run_bash_command_with_log = ray.remote(run_bash_command_with_log)',
f'setup_cmd = {setup_cmd!r}'
f'setup_cmd = {setup_cmd!r}',
]
# Currently, the codegen program is/can only be submitted to the head
# node, due to using job_lib for updating job statuses, and using
# autostop_lib here.
self._code.append(
# Use hasattr to handle backward compatibility.
# TODO(zongheng): remove in ~1-2 minor releases (currently 0.2.x).
textwrap.dedent("""\
if hasattr(autostop_lib, 'set_last_active_time_to_now'):
autostop_lib.set_last_active_time_to_now()
"""))
if setup_cmd is not None:
self._code += [
textwrap.dedent(f"""\
Expand Down Expand Up @@ -301,7 +312,7 @@ def add_gang_scheduling_placement_group(
pg = ray_util.placement_group({json.dumps(bundles)}, 'STRICT_SPREAD')
plural = 's' if {num_nodes} > 1 else ''
node_str = f'{num_nodes} node{{plural}}'
message = '' if setup_cmd is not None else {_CTRL_C_TIP_MESSAGE!r} + '\\n'
message += f'INFO: Waiting for task resources on {{node_str}}. This will block if the cluster is full.'
print(message,
Expand Down Expand Up @@ -447,7 +458,7 @@ def add_ray_task(self,
sky_env_vars_dict['SKYPILOT_NODE_RANK'] = ip_rank_map[ip]
# Environment starting with `SKY_` is deprecated.
sky_env_vars_dict['SKY_NODE_RANK'] = ip_rank_map[ip]
sky_env_vars_dict['SKYPILOT_INTERNAL_JOB_ID'] = {self.job_id}
# Environment starting with `SKY_` is deprecated.
sky_env_vars_dict['SKY_INTERNAL_JOB_ID'] = {self.job_id}
Expand Down
39 changes: 34 additions & 5 deletions sky/skylet/autostop_lib.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
"""Sky autostop utility function."""
"""Autostop utilities."""
import pickle
import psutil
import shlex
import time
from typing import List, Optional

from sky import sky_logging
from sky.skylet import configs

AUTOSTOP_CONFIG_KEY = 'autostop_config'
logger = sky_logging.init_logger(__name__)

_AUTOSTOP_CONFIG_KEY = 'autostop_config'

# This key-value is stored inside the 'configs' sqlite3 database, because both
# user-issued commands (this module) and the Skylet process running the
# AutostopEvent need to access that state.
_AUTOSTOP_LAST_ACTIVE_TIME = 'autostop_last_active_time'


class AutostopConfig:
Expand All @@ -29,8 +38,8 @@ def __set_state__(self, state: dict):
self.__dict__.update(state)


def get_autostop_config() -> Optional[AutostopConfig]:
config_str = configs.get_config(AUTOSTOP_CONFIG_KEY)
def get_autostop_config() -> AutostopConfig:
config_str = configs.get_config(_AUTOSTOP_CONFIG_KEY)
if config_str is None:
return AutostopConfig(-1, -1, None)
return pickle.loads(config_str)
Expand All @@ -39,7 +48,27 @@ def get_autostop_config() -> Optional[AutostopConfig]:
def set_autostop(idle_minutes: int, backend: Optional[str], down: bool) -> None:
boot_time = psutil.boot_time()
autostop_config = AutostopConfig(idle_minutes, boot_time, backend, down)
configs.set_config(AUTOSTOP_CONFIG_KEY, pickle.dumps(autostop_config))
prev_autostop_config = get_autostop_config()
configs.set_config(_AUTOSTOP_CONFIG_KEY, pickle.dumps(autostop_config))
logger.debug(f'set_autostop(): idle_minutes {idle_minutes}, down {down}.')
if (prev_autostop_config.autostop_idle_minutes < 0 or
prev_autostop_config.boot_time != psutil.boot_time()):
# Either autostop never set, or has been canceled. Reset timer.
set_last_active_time_to_now()


def get_last_active_time() -> float:
"""Returns the last active time, or -1 if none has been set."""
result = configs.get_config(_AUTOSTOP_LAST_ACTIVE_TIME)
if result is not None:
return float(result)
return -1


def set_last_active_time_to_now() -> None:
"""Sets the last active time to time.time()."""
logger.debug('Setting last active time.')
configs.set_config(_AUTOSTOP_LAST_ACTIVE_TIME, str(time.time()))


class AutostopCodeGen:
Expand Down
44 changes: 28 additions & 16 deletions sky/skylet/configs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""skylet configs"""
"""Skylet configs."""
import contextlib
import os
import pathlib
import sqlite3
Expand All @@ -7,26 +8,37 @@
_DB_PATH = os.path.expanduser('~/.sky/skylet_config.db')
os.makedirs(pathlib.Path(_DB_PATH).parents[0], exist_ok=True)

_CONN = sqlite3.connect(_DB_PATH)
_CURSOR = _CONN.cursor()

_CURSOR.execute("""\
CREATE TABLE IF NOT EXISTS config (
key TEXT PRIMARY KEY,
value TEXT)""")
@contextlib.contextmanager
def _safe_cursor():
"""A newly created, auto-commiting, auto-closing cursor."""
conn = sqlite3.connect(_DB_PATH)
cursor = conn.cursor()
try:
yield cursor
finally:
cursor.close()
conn.commit()
conn.close()

_CONN.commit()

with _safe_cursor() as c: # Call it 'c' to avoid pylint complaining.
c.execute("""\
CREATE TABLE IF NOT EXISTS config (
key TEXT PRIMARY KEY,
value TEXT)""")


def get_config(key: str) -> Optional[str]:
rows = _CURSOR.execute('SELECT value FROM config WHERE key = ?', (key,))
for (value,) in rows:
return value
with _safe_cursor() as cursor:
rows = cursor.execute('SELECT value FROM config WHERE key = ?', (key,))
for (value,) in rows:
return value


def set_config(key: str, value: str) -> None:
_CURSOR.execute(
"""\
INSERT OR REPLACE INTO config VALUES (?, ?)
""", (key, value))
_CONN.commit()
with _safe_cursor() as cursor:
cursor.execute(
"""\
INSERT OR REPLACE INTO config VALUES (?, ?)
""", (key, value))
30 changes: 20 additions & 10 deletions sky/skylet/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,34 +77,44 @@ def _run(self):


class AutostopEvent(SkyletEvent):
"""Skylet event for autostop."""
"""Skylet event for autostop.
Idleness timer gets set to 0 whenever:
- A first autostop setting is set. By "first", either there's never any
autostop setting set, or the last autostop setting is a cancel (idle
minutes < 0); or
- This event wakes up and job_lib.is_cluster_idle() returns False; or
- The cluster has restarted; or
- A job is submitted (handled in the backend; not here).
"""
EVENT_INTERVAL_SECONDS = 60

_UPSCALING_PATTERN = re.compile(r'upscaling_speed: (\d+)')
_CATCH_NODES = re.compile(r'cache_stopped_nodes: (.*)')

def __init__(self):
super().__init__()
self.last_active_time = time.time()
self.ray_yaml_path = os.path.abspath(
autostop_lib.set_last_active_time_to_now()
self._ray_yaml_path = os.path.abspath(
os.path.expanduser(backend_utils.SKY_RAY_YAML_REMOTE_PATH))

def _run(self):
autostop_config = autostop_lib.get_autostop_config()

if (autostop_config.autostop_idle_minutes < 0 or
autostop_config.boot_time != psutil.boot_time()):
self.last_active_time = time.time()
autostop_lib.set_last_active_time_to_now()
logger.debug('autostop_config not set. Skipped.')
return

if job_lib.is_cluster_idle():
idle_minutes = (time.time() - self.last_active_time) // 60
idle_minutes = (time.time() -
autostop_lib.get_last_active_time()) // 60
logger.debug(
f'Idle minutes: {idle_minutes}, '
f'AutoStop config: {autostop_config.autostop_idle_minutes}')
else:
self.last_active_time = time.time()
autostop_lib.set_last_active_time_to_now()
idle_minutes = -1
logger.debug(
'Not idle. Reset idle minutes.'
Expand All @@ -118,21 +128,21 @@ def _run(self):
def _stop_cluster(self, autostop_config):
if (autostop_config.backend ==
cloud_vm_ray_backend.CloudVmRayBackend.NAME):
self._replace_yaml_for_stopping(self.ray_yaml_path,
self._replace_yaml_for_stopping(self._ray_yaml_path,
autostop_config.down)
# `ray up` is required to reset the upscaling speed and min/max
# workers. Otherwise, `ray down --workers-only` will continuously
# scale down and up.
subprocess.run([
'ray', 'up', '-y', '--restart-only', '--disable-usage-stats',
self.ray_yaml_path
self._ray_yaml_path
],
check=True)
# Stop the workers first to avoid orphan workers.
subprocess.run(
['ray', 'down', '-y', '--workers-only', self.ray_yaml_path],
['ray', 'down', '-y', '--workers-only', self._ray_yaml_path],
check=True)
subprocess.run(['ray', 'down', '-y', self.ray_yaml_path],
subprocess.run(['ray', 'down', '-y', self._ray_yaml_path],
check=True)
else:
raise NotImplementedError
Expand Down
44 changes: 39 additions & 5 deletions tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,15 +625,46 @@ def test_autostop():
[
f'sky launch -y -d -c {name} --num-nodes 2 examples/minimal.yaml',
f'sky autostop -y {name} -i 1',

# Ensure autostop is set.
f'sky status | grep {name} | grep "1m"',
'sleep 180',

# Ensure the cluster is not stopped early.
'sleep 45',
f'sky status --refresh | grep {name} | grep UP',

# Ensure the cluster is STOPPED.
'sleep 90',
f'sky status --refresh | grep {name} | grep STOPPED',

# Ensure the cluster is UP and the autostop setting is reset ('-').
f'sky start -y {name}',
f'sky status | grep {name} | grep UP', # Ensure the cluster is UP.
f'sky status | grep {name} | grep -E "UP\s+-"',

# Ensure the job succeeded.
f'sky exec {name} examples/minimal.yaml',
f'sky logs {name} 2 --status', # Ensure the job succeeded.
f'sky logs {name} 2 --status',

# Test restarting the idleness timer via cancel + reset:
f'sky autostop -y {name} -i 1', # Idleness starts counting.
'sleep 45', # Almost reached the threshold.
f'sky autostop -y {name} --cancel',
f'sky autostop -y {name} -i 1', # Should restart the timer.
'sleep 45',
f'sky status --refresh | grep {name} | grep UP',
'sleep 90',
f'sky status --refresh | grep {name} | grep STOPPED',

# Test restarting the idleness timer via exec:
f'sky start -y {name}',
f'sky status | grep {name} | grep -E "UP\s+-"',
f'sky autostop -y {name} -i 1', # Idleness starts counting.
'sleep 45', # Almost reached the threshold.
f'sky exec {name} echo hi', # Should restart the timer.
'sleep 45',
f'sky status --refresh | grep {name} | grep UP',
'sleep 90',
f'sky status --refresh | grep {name} | grep STOPPED',
],
f'sky down -y {name}',
timeout=20 * 60,
Expand All @@ -647,12 +678,15 @@ def test_autodown():
test = Test(
'autodown',
[
f'sky launch -y -d -c {name} --num-nodes 2 --cloud gcp examples/minimal.yaml',
f'sky launch -y -d -c {name} --num-nodes 2 --cloud aws examples/minimal.yaml',
f'sky autostop -y {name} --down -i 1',
# Ensure autostop is set.
f'sky status | grep {name} | grep "1m (down)"',
'sleep 240',
# Ensure the cluster is not terminated early.
'sleep 45',
f'sky status --refresh | grep {name} | grep UP',
# Ensure the cluster is terminated.
'sleep 200',
f's=$(SKYPILOT_DEBUG=0 sky status --refresh) && printf "$s" && {{ echo "$s" | grep {name} | grep "Autodowned cluster\|terminated on the cloud"; }} || {{ echo "$s" | grep {name} && exit 1 || exit 0; }}',
f'sky launch -y -d -c {name} --cloud aws --num-nodes 2 --down examples/minimal.yaml',
f'sky status | grep {name} | grep UP', # Ensure the cluster is UP.
Expand Down

0 comments on commit ab9a802

Please sign in to comment.