From ada83dec50fce44756bc6af372c5c885aeefc996 Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Thu, 1 Dec 2022 15:03:53 -0800 Subject: [PATCH 1/8] fix in controller --- sky/backends/backend_utils.py | 5 +---- sky/backends/cloud_vm_ray_backend.py | 8 ++++---- sky/spot/recovery_strategy.py | 7 +++++++ 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 1bf9cedbf58..17b49db742c 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1226,11 +1226,8 @@ def _get_tpu_vm_pod_ips(ray_config: Dict[str, Any], cluster_name = ray_config['cluster_name'] zone = ray_config['provider']['availability_zone'] - # Excluding preempted VMs is safe as they are already terminated and - # do not charge. query_cmd = (f'gcloud compute tpus tpu-vm list --filter=' - f'"(labels.ray-cluster-name={cluster_name} AND ' - f'state!=PREEMPTED)" ' + f'\\(labels.ray-cluster-name={cluster_name}\\) ' f'--zone={zone} --format=value\\(name\\)') if not get_internal_ips: tpuvm_cmd = (f'gcloud compute tpus tpu-vm describe $({query_cmd})' diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index d85d4d1148e..26e5af23544 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -1343,6 +1343,9 @@ def need_ray_up( logger.info(f'{style.BRIGHT}Setting up TPU VM Pod workers...' f'{style.RESET_ALL}') self._tpu_pod_setup(cluster_config_file, cluster_handle) + logger.info( + f'{style.BRIGHT}Finished setting up TPU VM Pod workers...' + f'{style.RESET_ALL}') # Only 1 node or head node provisioning failure. if cluster_handle.launched_nodes == 1 and returncode == 0: @@ -2680,12 +2683,9 @@ def teardown_no_lock(self, # check if gcloud includes TPU VM API backend_utils.check_gcp_cli_include_tpu_vm() - # Excluding preempted VMs is safe as they are already - # terminated and do not charge. query_cmd = ( f'gcloud compute tpus tpu-vm list --filter=' - f'"(labels.ray-cluster-name={cluster_name} AND ' - f'state!=PREEMPTED)" ' + f'\\(labels.ray-cluster-name={cluster_name}\\) ' f'--zone={zone} --format=value\\(name\\)') terminate_cmd = ( f'gcloud compute tpus tpu-vm delete --zone={zone}' diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index 3459cb2edf7..a59d1fe85c1 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -12,6 +12,7 @@ from sky.spot import spot_utils from sky.usage import usage_lib from sky.utils import common_utils +from sky.utils import tpu_utils from sky.utils import ux_utils if typing.TYPE_CHECKING: @@ -305,6 +306,12 @@ def recover(self) -> float: new_resources = resources.copy(cloud=launched_cloud, region=launched_region) task.set_resources({new_resources}) + + # Note: Preempted TPU VM needs to be cleaned up first. + # Otherwise, it will occupy the quota. + is_tpuvm = tpu_utils.is_tpu_vm(new_resources) + if is_tpuvm: + self.terminate_cluster() # Not using self.launch to avoid the retry until up logic. launched_time = self._launch(raise_on_failure=False) # Restore the original dag, i.e. reset the region constraint. From 3160afdbab87023f7d8498eb5cf4ab022dde58ba Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Thu, 1 Dec 2022 15:16:09 -0800 Subject: [PATCH 2/8] remove debug msg --- sky/backends/cloud_vm_ray_backend.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 26e5af23544..19566bcaf22 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -1343,9 +1343,6 @@ def need_ray_up( logger.info(f'{style.BRIGHT}Setting up TPU VM Pod workers...' f'{style.RESET_ALL}') self._tpu_pod_setup(cluster_config_file, cluster_handle) - logger.info( - f'{style.BRIGHT}Finished setting up TPU VM Pod workers...' - f'{style.RESET_ALL}') # Only 1 node or head node provisioning failure. if cluster_handle.launched_nodes == 1 and returncode == 0: From c836216c1be4a448d47f2a2f32e32886e52895b7 Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Thu, 1 Dec 2022 15:17:15 -0800 Subject: [PATCH 3/8] msg --- sky/spot/recovery_strategy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index a59d1fe85c1..fe409a52bda 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -307,8 +307,8 @@ def recover(self) -> float: region=launched_region) task.set_resources({new_resources}) - # Note: Preempted TPU VM needs to be cleaned up first. - # Otherwise, it will occupy the quota. + # Note: Preempted TPU VM cannot be reused and needs to be + # cleaned up. Otherwise, it will occupy the quota. is_tpuvm = tpu_utils.is_tpu_vm(new_resources) if is_tpuvm: self.terminate_cluster() From c5233e794263d8ecd1099d4893592da957e7ef64 Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Thu, 1 Dec 2022 18:56:12 -0800 Subject: [PATCH 4/8] handle job_status == None case and refactor --- sky/spot/controller.py | 59 ++++++++++++++--------------------- sky/spot/recovery_strategy.py | 6 ---- 2 files changed, 24 insertions(+), 41 deletions(-) diff --git a/sky/spot/controller.py b/sky/spot/controller.py index 27640bbfe6a..5d63c8e87d9 100644 --- a/sky/spot/controller.py +++ b/sky/spot/controller.py @@ -85,30 +85,6 @@ def _run(self): job_status = spot_utils.get_job_status(self._backend, self._cluster_name) - if job_status is not None and not job_status.is_terminal(): - need_recovery = False - if self._task.num_nodes > 1: - # Check the cluster status for multi-node jobs, since the - # job may not be set to FAILED immediately when only some - # of the nodes are preempted. - (cluster_status, - handle) = backend_utils.refresh_cluster_status_handle( - self._cluster_name, force_refresh=True) - if cluster_status != global_user_state.ClusterStatus.UP: - # recover the cluster if it is not up. - # The status could be None when the cluster is preempted - # right after the job was found FAILED. - cluster_status_str = ('is preempted' - if cluster_status is None else - f'status {cluster_status.value}') - logger.info(f'Cluster {cluster_status_str}. ' - 'Recovering...') - need_recovery = True - if not need_recovery: - # The job and cluster are healthy, continue to monitor the - # job status. - continue - if job_status == job_lib.JobStatus.SUCCEEDED: end_time = spot_utils.get_job_timestamp(self._backend, self._cluster_name, @@ -117,14 +93,22 @@ def _run(self): spot_state.set_succeeded(self._job_id, end_time=end_time) break - if job_status == job_lib.JobStatus.FAILED: - # Check the status of the spot cluster. If it is not UP, - # the cluster is preempted. - (cluster_status, - handle) = backend_utils.refresh_cluster_status_handle( - self._cluster_name, force_refresh=True) - if cluster_status == global_user_state.ClusterStatus.UP: - # The user code has probably crashed. + # Get the up-to-date cluster status to determine + # whether preemption happens. + (cluster_status, + handle) = backend_utils.refresh_cluster_status_handle( + self._cluster_name, force_refresh=True) + + if cluster_status == global_user_state.ClusterStatus.UP: + if job_status is None: + # Rare case, likely to be network issue. + # to be conservative, we still recover the cluster. + pass + elif not job_status.is_terminal(): + # The job is still running. + continue + elif job_status == job_lib.JobStatus.FAILED: + # The user code has probably crashed, fail immediately. end_time = spot_utils.get_job_timestamp(self._backend, self._cluster_name, get_end_time=True) @@ -140,11 +124,16 @@ def _run(self): failure_type=spot_state.SpotStatus.FAILED, end_time=end_time) break + else: + raise RuntimeError(f'Unexpected job status: {job_status}') + + # Failed to connect to the cluster or the cluster is partially down. # cluster can be down, INIT or STOPPED, based on the interruption # behavior of the cloud. - # Failed to connect to the cluster or the cluster is partially down. - # job_status is None or job_status == job_lib.JobStatus.FAILED - logger.info('The cluster is preempted.') + cluster_status_str = ('' if cluster_status is None else + f' (status: {cluster_status.value})') + logger.info( + f'Cluster is preempted{cluster_status_str}. Recovering...') spot_state.set_recovering(self._job_id) recovered_time = self._strategy_executor.recover() spot_state.set_recovered(self._job_id, diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index fe409a52bda..394b1378f55 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -12,7 +12,6 @@ from sky.spot import spot_utils from sky.usage import usage_lib from sky.utils import common_utils -from sky.utils import tpu_utils from sky.utils import ux_utils if typing.TYPE_CHECKING: @@ -307,11 +306,6 @@ def recover(self) -> float: region=launched_region) task.set_resources({new_resources}) - # Note: Preempted TPU VM cannot be reused and needs to be - # cleaned up. Otherwise, it will occupy the quota. - is_tpuvm = tpu_utils.is_tpu_vm(new_resources) - if is_tpuvm: - self.terminate_cluster() # Not using self.launch to avoid the retry until up logic. launched_time = self._launch(raise_on_failure=False) # Restore the original dag, i.e. reset the region constraint. From 02623238e717808db8e6a1d6dff28c71e5c62ac6 Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Thu, 1 Dec 2022 18:56:55 -0800 Subject: [PATCH 5/8] space --- sky/spot/recovery_strategy.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index 394b1378f55..3459cb2edf7 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -305,7 +305,6 @@ def recover(self) -> float: new_resources = resources.copy(cloud=launched_cloud, region=launched_region) task.set_resources({new_resources}) - # Not using self.launch to avoid the retry until up logic. launched_time = self._launch(raise_on_failure=False) # Restore the original dag, i.e. reset the region constraint. From 3b2826ce1118da3aba7889524d07ba893abbbc9c Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Thu, 1 Dec 2022 21:39:56 -0800 Subject: [PATCH 6/8] update --- sky/spot/controller.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sky/spot/controller.py b/sky/spot/controller.py index 5d63c8e87d9..8f608bffe2f 100644 --- a/sky/spot/controller.py +++ b/sky/spot/controller.py @@ -93,8 +93,13 @@ def _run(self): spot_state.set_succeeded(self._job_id, end_time=end_time) break - # Get the up-to-date cluster status to determine - # whether preemption happens. + # For single-node jobs, non-terminated job_status means healthy. + if (job_status is not None and not job_status.is_terminal() and + self._task.num_nodes == 1): + continue + + # Oterwise, get the up-to-date cluster status to determine + # whether cluster is healthy. (cluster_status, handle) = backend_utils.refresh_cluster_status_handle( self._cluster_name, force_refresh=True) @@ -125,7 +130,8 @@ def _run(self): end_time=end_time) break else: - raise RuntimeError(f'Unexpected job status: {job_status}') + # unexpected job status, recover the cluster. + logger.info(f'Unexpected job status: {job_status}') # Failed to connect to the cluster or the cluster is partially down. # cluster can be down, INIT or STOPPED, based on the interruption From 5cb71b2d5e8bf5c86eee5f4b3ea16ffe21e6c58d Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Thu, 1 Dec 2022 22:33:20 -0800 Subject: [PATCH 7/8] comments --- sky/spot/controller.py | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/sky/spot/controller.py b/sky/spot/controller.py index 8f608bffe2f..7997cc07dd0 100644 --- a/sky/spot/controller.py +++ b/sky/spot/controller.py @@ -93,24 +93,29 @@ def _run(self): spot_state.set_succeeded(self._job_id, end_time=end_time) break - # For single-node jobs, non-terminated job_status means healthy. + # For single-node jobs, nonterminated job_status indicates a + # healthy cluster. We can safely continue monitoring. + # For multi-node jobs, since the job may not be set to FAILED + # immediately (depending on user program) when only some of the + # nodes are preempted, need to check the actual cluster status. if (job_status is not None and not job_status.is_terminal() and self._task.num_nodes == 1): continue - # Oterwise, get the up-to-date cluster status to determine - # whether cluster is healthy. + # Pull the actual cluster status from the cloud provider to + # determine whether the cluster is preempted. (cluster_status, handle) = backend_utils.refresh_cluster_status_handle( self._cluster_name, force_refresh=True) - if cluster_status == global_user_state.ClusterStatus.UP: - if job_status is None: - # Rare case, likely to be network issue. - # to be conservative, we still recover the cluster. - pass - elif not job_status.is_terminal(): - # The job is still running. + is_preempted = False + if cluster_status != global_user_state.ClusterStatus.UP: + # When the status is not UP, the cluster is likely preempted, + # and spot recovery is needed (will be done later in the code). + is_preempted = True + else: + if job_status is not None and not job_status.is_terminal(): + # The multi-node job is still running, continue monitoring. continue elif job_status == job_lib.JobStatus.FAILED: # The user code has probably crashed, fail immediately. @@ -130,16 +135,19 @@ def _run(self): end_time=end_time) break else: - # unexpected job status, recover the cluster. - logger.info(f'Unexpected job status: {job_status}') + # Unexpected job status received, try to recover + # the cluster to be safe. + logger.info(f'Got unexpected job status: {job_status}. ' + 'Recovering...') # Failed to connect to the cluster or the cluster is partially down. # cluster can be down, INIT or STOPPED, based on the interruption # behavior of the cloud. - cluster_status_str = ('' if cluster_status is None else - f' (status: {cluster_status.value})') - logger.info( - f'Cluster is preempted{cluster_status_str}. Recovering...') + if is_preempted: + cluster_status_str = ('' if cluster_status is None else + f' (status: {cluster_status.value})') + logger.info( + f'Cluster is preempted{cluster_status_str}. Recovering...') spot_state.set_recovering(self._job_id) recovered_time = self._strategy_executor.recover() spot_state.set_recovered(self._job_id, From 152c8eba7b4830f108bc03a719f912d8c97de0fe Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Mon, 5 Dec 2022 13:19:11 -0800 Subject: [PATCH 8/8] comments --- sky/spot/controller.py | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/sky/spot/controller.py b/sky/spot/controller.py index 7997cc07dd0..deb7fc22fc5 100644 --- a/sky/spot/controller.py +++ b/sky/spot/controller.py @@ -108,11 +108,14 @@ def _run(self): handle) = backend_utils.refresh_cluster_status_handle( self._cluster_name, force_refresh=True) - is_preempted = False if cluster_status != global_user_state.ClusterStatus.UP: - # When the status is not UP, the cluster is likely preempted, - # and spot recovery is needed (will be done later in the code). - is_preempted = True + # The cluster is (partially) preempted. It can be down, INIT + # or STOPPED, based on the interruption behavior of the cloud. + # Spot recovery is needed (will be done later in the code). + cluster_status_str = ('' if cluster_status is None else + f' (status: {cluster_status.value})') + logger.info( + f'Cluster is preempted{cluster_status_str}. Recovering...') else: if job_status is not None and not job_status.is_terminal(): # The multi-node job is still running, continue monitoring. @@ -134,20 +137,16 @@ def _run(self): failure_type=spot_state.SpotStatus.FAILED, end_time=end_time) break - else: - # Unexpected job status received, try to recover - # the cluster to be safe. - logger.info(f'Got unexpected job status: {job_status}. ' - 'Recovering...') + # Although the cluster is healthy, we fail to access the + # job status. Try to recover the job (will not restart the + # cluster, if the cluster is healthy). + assert job_status is None, job_status + logger.info('Failed to fetch the job status while the ' + 'cluster is healthy. Try to recover the job ' + '(the cluster will not be restarted).') - # Failed to connect to the cluster or the cluster is partially down. - # cluster can be down, INIT or STOPPED, based on the interruption - # behavior of the cloud. - if is_preempted: - cluster_status_str = ('' if cluster_status is None else - f' (status: {cluster_status.value})') - logger.info( - f'Cluster is preempted{cluster_status_str}. Recovering...') + # Try to recover the spot jobs, when the cluster is preempted + # or the job status is failed to be fetched. spot_state.set_recovering(self._job_id) recovered_time = self._strategy_executor.recover() spot_state.set_recovered(self._job_id,