Skip to content

Commit

Permalink
[robustness] cover some potential resource leakage cases (skypilot-or…
Browse files Browse the repository at this point in the history
…g#4443)

* if a newly-created cluster is missing from the cloud, wait before deleting

Addresses skypilot-org#4431.

* confirm cluster actually terminates before deleting from the db

* avoid deleting cluster data outside the primary provision loop

* tweaks

* Apply suggestions from code review

Co-authored-by: Zhanghao Wu <[email protected]>

* use usage_intervals for new cluster detection

get_cluster_duration will include the total duration of the cluster since its
initial launch, while launched_at may be reset by sky launch on an existing
cluster. So this is a more accurate method to check.

* fix terminating/stopping state for Lambda and Paperspace

* Revert "use usage_intervals for new cluster detection"

This reverts commit aa6d2e9.

* check cloud.STATUS_VERSION before calling query_instances

* avoid try/catch when querying instances

* update comments

---------

Co-authored-by: Zhanghao Wu <[email protected]>
  • Loading branch information
2 people authored and zpoint committed Dec 9, 2024
1 parent 6653026 commit 77b11f3
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 21 deletions.
49 changes: 40 additions & 9 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@
_ENDPOINTS_RETRY_MESSAGE = ('If the cluster was recently started, '
'please retry after a while.')

# If a cluster is less than LAUNCH_DOUBLE_CHECK_WINDOW seconds old, and we don't
# see any instances in the cloud, the instances might be in the proccess of
# being created. We will wait LAUNCH_DOUBLE_CHECK_DELAY seconds and then double
# check to make sure there are still no instances. LAUNCH_DOUBLE_CHECK_DELAY
# should be set longer than the delay between (sending the create instance
# request) and (the instances appearing on the cloud).
# See https://github.com/skypilot-org/skypilot/issues/4431.
_LAUNCH_DOUBLE_CHECK_WINDOW = 60
_LAUNCH_DOUBLE_CHECK_DELAY = 1

# Include the fields that will be used for generating tags that distinguishes
# the cluster in ray, to avoid the stopped cluster being discarded due to
# updates in the yaml template.
Expand Down Expand Up @@ -1732,13 +1742,12 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
logger.debug(
f'Refreshing status ({cluster_name!r}) failed to get IPs.')
except RuntimeError as e:
logger.debug(str(e))
logger.debug(common_utils.format_exception(e))
except Exception as e: # pylint: disable=broad-except
# This can be raised by `external_ssh_ports()`, due to the
# underlying call to kubernetes API.
logger.debug(
f'Refreshing status ({cluster_name!r}) failed: '
f'{common_utils.format_exception(e, use_bracket=True)}')
logger.debug(f'Refreshing status ({cluster_name!r}) failed: ',
exc_info=e)
return False

# Determining if the cluster is healthy (UP):
Expand All @@ -1765,6 +1774,24 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
return record

# All cases below are transitioning the cluster to non-UP states.

if (not node_statuses and handle.launched_resources.cloud.STATUS_VERSION >=
clouds.StatusVersion.SKYPILOT):
# Note: launched_at is set during sky launch, even on an existing
# cluster. This will catch the case where the cluster was terminated on
# the cloud and restarted by sky launch.
time_since_launch = time.time() - record['launched_at']
if (record['status'] == status_lib.ClusterStatus.INIT and
time_since_launch < _LAUNCH_DOUBLE_CHECK_WINDOW):
# It's possible the instances for this cluster were just created,
# and haven't appeared yet in the cloud API/console. Wait for a bit
# and check again. This is a best-effort leak prevention check.
# See https://github.com/skypilot-org/skypilot/issues/4431.
time.sleep(_LAUNCH_DOUBLE_CHECK_DELAY)
node_statuses = _query_cluster_status_via_cloud_api(handle)
# Note: even if all the node_statuses are UP now, we will still
# consider this cluster abnormal, and its status will be INIT.

if len(node_statuses) > handle.launched_nodes:
# Unexpected: in the queried region more than 1 cluster with the same
# constructed name tag returned. This will typically not happen unless
Expand Down Expand Up @@ -1793,13 +1820,15 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
f'{colorama.Style.RESET_ALL}')
assert len(node_statuses) <= handle.launched_nodes

# If the node_statuses is empty, all the nodes are terminated. We can
# safely set the cluster status to TERMINATED. This handles the edge case
# where the cluster is terminated by the user manually through the UI.
# If the node_statuses is empty, it should mean that all the nodes are
# terminated and we can set the cluster status to TERMINATED. This handles
# the edge case where the cluster is terminated by the user manually through
# the UI.
to_terminate = not node_statuses

# A cluster is considered "abnormal", if not all nodes are TERMINATED or
# not all nodes are STOPPED. We check that with the following logic:
# A cluster is considered "abnormal", if some (but not all) nodes are
# TERMINATED, or not all nodes are STOPPED. We check that with the following
# logic:
# * Not all nodes are terminated and there's at least one node
# terminated; or
# * Any of the non-TERMINATED nodes is in a non-STOPPED status.
Expand All @@ -1811,6 +1840,8 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
# cluster is probably down.
# * The cluster is partially terminated or stopped should be considered
# abnormal.
# * The cluster is partially or completely in the INIT state, which means
# that provisioning was interrupted. This is considered abnormal.
#
# An abnormal cluster will transition to INIT and have any autostop setting
# reset (unless it's autostopping/autodowning).
Expand Down
58 changes: 49 additions & 9 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@
# The maximum retry count for fetching IP address.
_FETCH_IP_MAX_ATTEMPTS = 3

# How many times to query the cloud provider to make sure instances are
# stopping/terminating, and how long to wait between each query.
_TEARDOWN_WAIT_MAX_ATTEMPTS = 10
_TEARDOWN_WAIT_BETWEEN_ATTEMPS_SECONDS = 1

_TEARDOWN_FAILURE_MESSAGE = (
f'\n{colorama.Fore.RED}Failed to terminate '
'{cluster_name}. {extra_reason}'
Expand Down Expand Up @@ -2357,15 +2362,17 @@ def get_command_runners(self,
zip(ip_list, port_list), **ssh_credentials)
return runners
if self.cached_cluster_info is None:
# We have `or self.cached_external_ips is None` here, because
# We have `and self.cached_external_ips is None` here, because
# when a cluster's cloud is just upgraded to the new provsioner,
# although it has the cached_external_ips, the cached_cluster_info
# can be None. We need to update it here, even when force_cached is
# set to True.
# TODO: We can remove `self.cached_external_ips is None` after
# version 0.8.0.
assert not force_cached or self.cached_external_ips is not None, (
force_cached, self.cached_external_ips)
if force_cached and self.cached_external_ips is None:
raise RuntimeError(
'Tried to use cached cluster info, but it\'s missing for '
f'cluster "{self.cluster_name}"')
self._update_cluster_info()
assert self.cached_cluster_info is not None, self
runners = provision_lib.get_command_runners(
Expand Down Expand Up @@ -2784,9 +2791,6 @@ def _provision(
if e.no_failover:
error_message = str(e)
else:
# Clean up the cluster's entry in `sky status`.
global_user_state.remove_cluster(cluster_name,
terminate=True)
usage_lib.messages.usage.update_final_cluster_status(
None)
error_message = (
Expand Down Expand Up @@ -3928,7 +3932,6 @@ def teardown_no_lock(self,
limit=1000).get_result()['items']
vpc_id = None
try:
# pylint: disable=line-too-long
vpc_id = vpcs_filtered_by_tags_and_region[0]['crn'].rsplit(
':', 1)[-1]
vpc_found = True
Expand All @@ -3937,7 +3940,6 @@ def teardown_no_lock(self,
returncode = -1

if vpc_found:
# pylint: disable=line-too-long E1136
# Delete VPC and it's associated resources
vpc_provider = IBMVPCProvider(
config_provider['resource_group_id'], region,
Expand Down Expand Up @@ -4058,6 +4060,7 @@ def post_teardown_cleanup(self,
* Removing the terminated cluster's scripts and ray yaml files.
"""
cluster_name_on_cloud = handle.cluster_name_on_cloud
cloud = handle.launched_resources.cloud

if (terminate and handle.launched_resources.is_image_managed is True):
# Delete the image when terminating a "cloned" cluster, i.e.,
Expand All @@ -4078,7 +4081,6 @@ def post_teardown_cleanup(self,
'remove it manually to avoid image leakage. Details: '
f'{common_utils.format_exception(e, use_bracket=True)}')
if terminate:
cloud = handle.launched_resources.cloud
config = common_utils.read_yaml(handle.cluster_yaml)
try:
cloud.check_features_are_supported(
Expand All @@ -4105,6 +4107,44 @@ def post_teardown_cleanup(self,
config = common_utils.read_yaml(handle.cluster_yaml)
backend_utils.SSHConfigHelper.remove_cluster(handle.cluster_name)

# Confirm that instances have actually transitioned state before
# updating the state database. We do this immediately before removing
# the state from the database, so that we can guarantee that this is
# always called before the state is removed. We considered running this
# check as part of provisioner.teardown_cluster or
# provision.terminate_instances, but it would open the door code paths
# that successfully call this function but do not first call
# teardown_cluster or terminate_instances. See
# https://github.com/skypilot-org/skypilot/pull/4443#discussion_r1872798032
attempts = 0
while True:
logger.debug(f'instance statuses attempt {attempts + 1}')
node_status_dict = provision_lib.query_instances(
repr(cloud),
cluster_name_on_cloud,
config['provider'],
non_terminated_only=False)

unexpected_node_state: Optional[Tuple[str, str]] = None
for node_id, node_status in node_status_dict.items():
logger.debug(f'{node_id} status: {node_status}')
# FIXME(cooperc): Some clouds (e.g. GCP) do not distinguish
# between "stopping/stopped" and "terminating/terminated", so we
# allow for either status instead of casing on `terminate`.
if node_status not in [None, status_lib.ClusterStatus.STOPPED]:
unexpected_node_state = (node_id, node_status)

if unexpected_node_state is None:
break

attempts += 1
if attempts < _TEARDOWN_WAIT_MAX_ATTEMPTS:
time.sleep(_TEARDOWN_WAIT_BETWEEN_ATTEMPS_SECONDS)
else:
(node_id, node_status) = unexpected_node_state
raise RuntimeError(f'Instance {node_id} in unexpected state '
f'{node_status}.')

global_user_state.remove_cluster(handle.cluster_name,
terminate=terminate)

Expand Down
2 changes: 1 addition & 1 deletion sky/provision/azure/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def cluster_status_map(
) -> Dict['AzureInstanceStatus', Optional[status_lib.ClusterStatus]]:
return {
cls.PENDING: status_lib.ClusterStatus.INIT,
cls.STOPPING: status_lib.ClusterStatus.INIT,
cls.RUNNING: status_lib.ClusterStatus.UP,
cls.STOPPING: status_lib.ClusterStatus.STOPPED,
cls.STOPPED: status_lib.ClusterStatus.STOPPED,
cls.DELETING: None,
}
Expand Down
2 changes: 2 additions & 0 deletions sky/provision/gcp/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def _filter_instances(
# non_terminated_only=True?
# Will there be callers who would want this to be False?
# stop() and terminate() for example already implicitly assume non-terminated.
# Currently, even with non_terminated_only=False, we may not have a dict entry
# for terminated instances, if they have already been fully deleted.
@common_utils.retry
def query_instances(
cluster_name_on_cloud: str,
Expand Down
2 changes: 1 addition & 1 deletion sky/provision/lambda_cloud/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def query_instances(
'booting': status_lib.ClusterStatus.INIT,
'active': status_lib.ClusterStatus.UP,
'unhealthy': status_lib.ClusterStatus.INIT,
'terminating': status_lib.ClusterStatus.INIT,
'terminating': None,
}
statuses: Dict[str, Optional[status_lib.ClusterStatus]] = {}
for instance_id, instance in instances.items():
Expand Down
3 changes: 2 additions & 1 deletion sky/provision/paperspace/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,13 @@ def query_instances(
assert provider_config is not None, (cluster_name_on_cloud, provider_config)
instances = _filter_instances(cluster_name_on_cloud, None)

# https://docs.digitalocean.com/reference/paperspace/core/commands/machines/#show
status_map = {
'starting': status_lib.ClusterStatus.INIT,
'restarting': status_lib.ClusterStatus.INIT,
'upgrading': status_lib.ClusterStatus.INIT,
'provisioning': status_lib.ClusterStatus.INIT,
'stopping': status_lib.ClusterStatus.INIT,
'stopping': status_lib.ClusterStatus.STOPPED,
'serviceready': status_lib.ClusterStatus.INIT,
'ready': status_lib.ClusterStatus.UP,
'off': status_lib.ClusterStatus.STOPPED,
Expand Down

0 comments on commit 77b11f3

Please sign in to comment.