From f379c700fbcf7b6b2ce30b14a60b097a08f72f50 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Tue, 29 Nov 2022 09:01:21 -0800 Subject: [PATCH] [Spot] Make sure the cluster status is not None when showing (#1464) * Make sure the cluster status is not None when showing * Fix another potential issue with NoneType of handle * Add assert * fix * format * Address comments * Address comments * format * format * fix * fix * fix spot cancellation * format --- sky/spot/controller.py | 7 +++- sky/spot/recovery_strategy.py | 63 +++++++++++++++++++++++++---------- tests/test_smoke.py | 4 ++- 3 files changed, 54 insertions(+), 20 deletions(-) diff --git a/sky/spot/controller.py b/sky/spot/controller.py index 16c2459603b1..27640bbfe6a8 100644 --- a/sky/spot/controller.py +++ b/sky/spot/controller.py @@ -96,7 +96,12 @@ def _run(self): self._cluster_name, force_refresh=True) if cluster_status != global_user_state.ClusterStatus.UP: # recover the cluster if it is not up. - logger.info(f'Cluster status {cluster_status.value}. ' + # The status could be None when the cluster is preempted + # right after the job was found FAILED. + cluster_status_str = ('is preempted' + if cluster_status is None else + f'status {cluster_status.value}') + logger.info(f'Cluster {cluster_status_str}. ' 'Recovering...') need_recovery = True if not need_recovery: diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index 17d870ecfcc2..3459cb2edf7b 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -136,6 +136,10 @@ def _launch(self, max_retry=3, raise_on_failure=True) -> Optional[float]: Args: max_retry: The maximum number of retries. If None, retry forever. raise_on_failure: Whether to raise an exception if the launch fails. + + Returns: + The job's start timestamp, or None if failed to start and + raise_on_failure is False. """ # TODO(zhwu): handle the failure during `preparing sky runtime`. retry_cnt = 0 @@ -251,6 +255,31 @@ class FailoverStrategyExecutor(StrategyExecutor, name='FAILOVER', default=True): _MAX_RETRY_CNT = 240 # Retry for 4 hours. + def __init__(self, cluster_name: str, backend: 'backends.Backend', + task: 'task_lib.Task', retry_until_up: bool) -> None: + super().__init__(cluster_name, backend, task, retry_until_up) + # Note down the cloud/region of the launched cluster, so that we can + # first retry in the same cloud/region. (Inside recover() we may not + # rely on cluster handle, as it can be None if the cluster is + # preempted.) + self._launched_cloud_region = None + + def _launch(self, max_retry=3, raise_on_failure=True) -> Optional[float]: + launch_time = super()._launch(max_retry, raise_on_failure) + if launch_time is not None: + # Only record the cloud/region if the launch is successful. + handle = global_user_state.get_handle_from_cluster_name( + self.cluster_name) + assert handle is not None, 'Cluster should be launched.' + launched_resources = handle.launched_resources + self._launched_cloud_region = (launched_resources.cloud, + launched_resources.region) + return launch_time + + def terminate_cluster(self, max_retry: int = 3) -> None: + super().terminate_cluster(max_retry) + self._launched_cloud_region = None + def recover(self) -> float: # 1. Cancel the jobs and launch the cluster with the STOPPED status, # so that it will try on the current region first until timeout. @@ -264,26 +293,24 @@ def recover(self) -> float: # Retry the entire block until the cluster is up, so that the ratio of # the time spent in the current region and the time spent in the other # region is consistent during the retry. - handle = global_user_state.get_handle_from_cluster_name( - self.cluster_name) while True: # Add region constraint to the task, to retry on the same region - # first. - task = self.dag.tasks[0] - resources = list(task.resources)[0] - original_resources = resources - - launched_cloud = handle.launched_resources.cloud - launched_region = handle.launched_resources.region - new_resources = resources.copy(cloud=launched_cloud, - region=launched_region) - task.set_resources({new_resources}) - # Not using self.launch to avoid the retry until up logic. - launched_time = self._launch(raise_on_failure=False) - # Restore the original dag, i.e. reset the region constraint. - task.set_resources({original_resources}) - if launched_time is not None: - return launched_time + # first (if valid). + if self._launched_cloud_region is not None: + task = self.dag.tasks[0] + resources = list(task.resources)[0] + original_resources = resources + + launched_cloud, launched_region = self._launched_cloud_region + new_resources = resources.copy(cloud=launched_cloud, + region=launched_region) + task.set_resources({new_resources}) + # Not using self.launch to avoid the retry until up logic. + launched_time = self._launch(raise_on_failure=False) + # Restore the original dag, i.e. reset the region constraint. + task.set_resources({original_resources}) + if launched_time is not None: + return launched_time # Step 2 logger.debug('Terminating unhealthy spot cluster.') diff --git a/tests/test_smoke.py b/tests/test_smoke.py index e0bca06bc768..af59fd3870e1 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -937,10 +937,12 @@ def test_spot_cancellation(): 'sleep 10', f's=$(sky spot queue); printf "$s"; echo; echo; printf "$s" | grep {name}-3 | head -n1 | grep "CANCELLED"', 'sleep 90', + # The cluster should be terminated (shutting-down) after cancellation. We don't use the `=` operator here because + # there can be multiple VM with the same name due to the recovery. (f's=$(aws ec2 describe-instances --region {region} ' f'--filters Name=tag:ray-cluster-name,Values={name}-3* ' f'--query Reservations[].Instances[].State[].Name ' - '--output text) && printf "$s" && echo; [[ -z "$s" ]] || [[ "$s" = "terminated" ]] || [[ "$s" = "shutting-down" ]]' + '--output text) && printf "$s" && echo; [[ -z "$s" ]] || echo "$s" | grep -v -E "pending|running|stopped|stopping"' ), ]) run_one_test(test)