From 4d77d3d21f111b6ae5172882484c7102f6bd5eac Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Wed, 7 Dec 2022 15:29:25 -0800 Subject: [PATCH 01/16] safe guard --- sky/backends/backend_utils.py | 81 ++++++++++++++++++++++++----------- 1 file changed, 57 insertions(+), 24 deletions(-) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index b87b8373592..e39926f5bb8 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1228,34 +1228,67 @@ def _get_tpu_vm_pod_ips(ray_config: Dict[str, Any], query_cmd = (f'gcloud compute tpus tpu-vm list --filter=' f'\\(labels.ray-cluster-name={cluster_name}\\) ' f'--zone={zone} --format=value\\(name\\)') - if not get_internal_ips: - tpuvm_cmd = (f'gcloud compute tpus tpu-vm describe $({query_cmd})' - f' --zone {zone} --format="value[delimiter=\'\\n\']' - '(networkEndpoints.accessConfig.externalIp)"') - else: - tpuvm_cmd = (f'gcloud compute tpus tpu-vm describe $({query_cmd})' - f' --zone {zone} --format="value[delimiter=\'\\n\']' - '(networkEndpoints.ipAddress)"') - - rcode, stdout, stderr = log_lib.run_with_log(tpuvm_cmd, - '/dev/null', - shell=True, - stream_logs=False, - require_outputs=True) - if rcode != 0: - failure_massage = ('Failed to run gcloud to get TPU VM Pod IPs.\n' + returncode, stdout, stderr = log_lib.run_with_log(query_cmd, + '/dev/null', + shell=True, + stream_logs=False, + require_outputs=True) + if returncode != 0: + failure_massage = ('Failed to run gcloud to get TPU VM IDs.\n' '**** STDOUT ****\n' - '{stdout}\n' + f'{stdout}\n' '**** STDERR ****\n' - '{stderr}\n' + f'{stderr}\n' '**** CMD ****\n' - '{tpuvm_cmd}') + f'{query_cmd}\n') with ux_utils.print_exception_no_traceback(): - raise RuntimeError( - failure_massage.format(stdout=stdout, - stderr=stderr, - tpuvm_cmd=tpuvm_cmd)) - all_ips = re.findall(IP_ADDR_REGEX, stdout) + raise RuntimeError(failure_massage) + if len(stdout) == 0: + logger.warning('No TPU VMs found with cluster name ' + f'{cluster_name} in zone {zone}.') + if len(stdout.splitlines()) > 1: + logger.warning('Found more than one TPU VM with cluster name ' + f'{cluster_name} in zone {zone}.') + + all_ips = [] + for tpu_id in stdout.splitlines(): + tpuvm_cmd = (f'gcloud compute tpus tpu-vm describe {tpu_id}' + f' --zone {zone} --format=json') + returncode, stdout, stderr = log_lib.run_with_log(tpuvm_cmd, + '/dev/null', + shell=True, + stream_logs=False, + require_outputs=True) + if returncode != 0: + failure_massage = ('Failed to run gcloud tpu-vm describe.\n' + '**** STDOUT ****\n' + f'{stdout}\n' + '**** STDERR ****\n' + f'{stderr}\n' + '**** CMD ****\n' + f'{tpuvm_cmd}\n') + with ux_utils.print_exception_no_traceback(): + raise RuntimeError(failure_massage) + + tpuvm_json = json.loads(stdout) + if tpuvm_json['state'] != 'READY': + # May be a leaked preempted resource. + logger.warning(f'TPU VM {tpu_id} is not in READY state.' + 'Could be a garbage resource. Skipping...') + continue + + if not get_internal_ips: + ips = [ + endpoint['accessConfig']['externalIp'] + for endpoint in tpuvm_json['networkEndpoints'] + ] + else: + ips = [ + endpoint['ipAddress'] + for endpoint in tpuvm_json['networkEndpoints'] + ] + all_ips.extend(ips) + return all_ips From a9340bf52ac06ed3ca8d3e0e119765a43f56509b Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Wed, 7 Dec 2022 23:12:39 -0800 Subject: [PATCH 02/16] terminate the cluster to be safe --- sky/spot/recovery_strategy.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index 3459cb2edf7..5c385fe8836 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -12,6 +12,7 @@ from sky.spot import spot_utils from sky.usage import usage_lib from sky.utils import common_utils +from sky.utils import tpu_utils from sky.utils import ux_utils if typing.TYPE_CHECKING: @@ -305,6 +306,13 @@ def recover(self) -> float: new_resources = resources.copy(cloud=launched_cloud, region=launched_region) task.set_resources({new_resources}) + + # Clean up preempted TPU VM before launching the cluster. + # This is needed as status -r will not remove it if GCP + # turns the VM state to other than PREEMPTED. + is_tpuvm = tpu_utils.is_tpu_vm(new_resources) + if is_tpuvm: + self.terminate_cluster() # Not using self.launch to avoid the retry until up logic. launched_time = self._launch(raise_on_failure=False) # Restore the original dag, i.e. reset the region constraint. From e24aa8ad4ec33d5381a4c3c9a8087e277530bd55 Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Thu, 8 Dec 2022 13:50:07 -0800 Subject: [PATCH 03/16] update --- sky/spot/controller.py | 8 ++++++++ sky/spot/recovery_strategy.py | 7 ------- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sky/spot/controller.py b/sky/spot/controller.py index deb7fc22fc5..ef9de5e1243 100644 --- a/sky/spot/controller.py +++ b/sky/spot/controller.py @@ -22,6 +22,7 @@ from sky.spot import spot_utils from sky.utils import common_utils from sky.utils import subprocess_utils +from sky.utils import tpu_utils logger = sky_logging.init_logger(__name__) @@ -145,6 +146,13 @@ def _run(self): 'cluster is healthy. Try to recover the job ' '(the cluster will not be restarted).') + # Clean up preempted TPU VM before recovering the cluster. + # This is needed as "status -r" may not remove it if GCP + # turns the VM state to other than PREEMPTED. + is_tpuvm = tpu_utils.is_tpu_vm(list(self._task.resources)[0]) + if is_tpuvm: + self._strategy_executor.terminate_cluster() + # Try to recover the spot jobs, when the cluster is preempted # or the job status is failed to be fetched. spot_state.set_recovering(self._job_id) diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index 5c385fe8836..394b1378f55 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -12,7 +12,6 @@ from sky.spot import spot_utils from sky.usage import usage_lib from sky.utils import common_utils -from sky.utils import tpu_utils from sky.utils import ux_utils if typing.TYPE_CHECKING: @@ -307,12 +306,6 @@ def recover(self) -> float: region=launched_region) task.set_resources({new_resources}) - # Clean up preempted TPU VM before launching the cluster. - # This is needed as status -r will not remove it if GCP - # turns the VM state to other than PREEMPTED. - is_tpuvm = tpu_utils.is_tpu_vm(new_resources) - if is_tpuvm: - self.terminate_cluster() # Not using self.launch to avoid the retry until up logic. launched_time = self._launch(raise_on_failure=False) # Restore the original dag, i.e. reset the region constraint. From 00111a51bf07a62a66f8259593e97c8b4265f9d0 Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Thu, 8 Dec 2022 13:50:41 -0800 Subject: [PATCH 04/16] rm --- sky/spot/recovery_strategy.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index 394b1378f55..3459cb2edf7 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -305,7 +305,6 @@ def recover(self) -> float: new_resources = resources.copy(cloud=launched_cloud, region=launched_region) task.set_resources({new_resources}) - # Not using self.launch to avoid the retry until up logic. launched_time = self._launch(raise_on_failure=False) # Restore the original dag, i.e. reset the region constraint. From 732298dba4364ac4104d5e5c59cd70673d9103fb Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Sat, 10 Dec 2022 13:27:03 -0800 Subject: [PATCH 05/16] better abstraction --- sky/backends/backend_utils.py | 2 +- sky/backends/cloud_vm_ray_backend.py | 34 +++++++++++++++++++--------- sky/clouds/aws.py | 8 +++++++ sky/clouds/azure.py | 9 ++++++++ sky/clouds/cloud.py | 4 ++++ sky/clouds/gcp.py | 15 ++++++++++++ sky/resources.py | 3 +++ sky/spot/controller.py | 12 +++++----- sky/utils/tpu_utils.py | 17 +++++++------- 9 files changed, 78 insertions(+), 26 deletions(-) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index e39926f5bb8..46f2934ea79 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1273,7 +1273,7 @@ def _get_tpu_vm_pod_ips(ray_config: Dict[str, Any], tpuvm_json = json.loads(stdout) if tpuvm_json['state'] != 'READY': # May be a leaked preempted resource. - logger.warning(f'TPU VM {tpu_id} is not in READY state.' + logger.warning(f'TPU VM {tpu_id} is not in READY state. ' 'Could be a garbage resource. Skipping...') continue diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index c9f51452d4a..85700dc3143 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2657,6 +2657,7 @@ def teardown_no_lock(self, elif (terminate and (prev_status == global_user_state.ClusterStatus.STOPPED or use_tpu_vm)): + terminate_cmds = [] # For TPU VMs, gcloud CLI is used for VM termination. if isinstance(cloud, clouds.AWS): # TODO(zhwu): Room for optimization. We can move these cloud @@ -2669,7 +2670,7 @@ def teardown_no_lock(self, f'Name=tag:ray-cluster-name,Values={handle.cluster_name} ' f'--query Reservations[].Instances[].InstanceId ' '--output text') - terminate_cmd = ( + terminate_cmds.append( f'aws ec2 terminate-instances --region {region} ' f'--instance-ids $({query_cmd})') elif isinstance(cloud, clouds.GCP): @@ -2684,15 +2685,25 @@ def teardown_no_lock(self, f'gcloud compute tpus tpu-vm list --filter=' f'\\(labels.ray-cluster-name={cluster_name}\\) ' f'--zone={zone} --format=value\\(name\\)') - terminate_cmd = ( - f'gcloud compute tpus tpu-vm delete --zone={zone}' - f' --quiet $({query_cmd})') + returncode, stdout, stderr = log_lib.run_with_log( + query_cmd, + log_abs_path, + shell=True, + stream_logs=False, + require_outputs=True) + + # Needs to create a list as GCP does not allow deleting + # multiple TPU VMs at once + for tpu_id in stdout.splitlines(): + terminate_cmds.append( + f'gcloud compute tpus tpu-vm delete --zone={zone} ' + f'--quiet {tpu_id}') else: query_cmd = ( f'gcloud compute instances list --filter=' f'\\(labels.ray-cluster-name={cluster_name}\\) ' f'--zones={zone} --format=value\\(name\\)') - terminate_cmd = ( + terminate_cmds.append( f'gcloud compute instances delete --zone={zone}' f' --quiet $({query_cmd})') else: @@ -2701,12 +2712,13 @@ def teardown_no_lock(self, f'cluster {cluster_name!r}.') with backend_utils.safe_console_status(f'[bold cyan]Terminating ' f'[green]{cluster_name}'): - returncode, stdout, stderr = log_lib.run_with_log( - terminate_cmd, - log_abs_path, - shell=True, - stream_logs=False, - require_outputs=True) + for terminate_cmd in terminate_cmds: + returncode, stdout, stderr = log_lib.run_with_log( + terminate_cmd, + log_abs_path, + shell=True, + stream_logs=False, + require_outputs=True) else: config['provider']['cache_stopped_nodes'] = not terminate with tempfile.NamedTemporaryFile('w', diff --git a/sky/clouds/aws.py b/sky/clouds/aws.py index 4d1a5bdb7b6..d73a1c44af2 100644 --- a/sky/clouds/aws.py +++ b/sky/clouds/aws.py @@ -351,3 +351,11 @@ def accelerator_in_region_or_zone(self, zone: Optional[str] = None) -> bool: return service_catalog.accelerator_in_region_or_zone( accelerator, acc_count, region, zone, 'aws') + + def is_spot_restartable(self, resources: 'resources_lib.Resources') -> bool: + """Returns whether a spot instance can be restarted after preemption.""" + # By default, AWS Spot instances are not restartable after preemption. + # "Terminate interrupted Spot Instances (this is the default behavior)" + # See: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/interruption-behavior.html # pylint: disable=line-too-long + del resources # unused + return False diff --git a/sky/clouds/azure.py b/sky/clouds/azure.py index 8c4adc744e3..e448c20342e 100644 --- a/sky/clouds/azure.py +++ b/sky/clouds/azure.py @@ -305,6 +305,15 @@ def accelerator_in_region_or_zone(self, return service_catalog.accelerator_in_region_or_zone( accelerator, acc_count, region, zone, 'azure') + def is_spot_restartable(self, resources: 'resources.Resources') -> bool: + """Returns whether a spot instance can be restarted after preemption.""" + # By default, Azure Spot instances are restartable after preemption. + # "When creating an Azure Spot Virtual Machine, you can set + # the eviction policy to Deallocate (default) or Delete." + # See: https://learn.microsoft.com/en-us/azure/virtual-machines/spot-vms#eviction-policy # pylint: disable=line-too-long + del resources # Unused. + return True + @classmethod def get_project_id(cls, dryrun: bool = False) -> str: if dryrun: diff --git a/sky/clouds/cloud.py b/sky/clouds/cloud.py index f7530561165..eb26515c94c 100644 --- a/sky/clouds/cloud.py +++ b/sky/clouds/cloud.py @@ -214,5 +214,9 @@ def accelerator_in_region_or_zone(self, """Returns whether the accelerator is valid in the region or zone.""" raise NotImplementedError + def is_spot_restartable(self, resource: 'resources.Resources') -> bool: + """Returns whether a spot instance can be restarted after preemption.""" + raise NotImplementedError + def __repr__(self): return self._REPR diff --git a/sky/clouds/gcp.py b/sky/clouds/gcp.py index e176e20d2e8..a6b159f2c33 100644 --- a/sky/clouds/gcp.py +++ b/sky/clouds/gcp.py @@ -468,6 +468,21 @@ def accelerator_in_region_or_zone(self, return service_catalog.accelerator_in_region_or_zone( accelerator, acc_count, region, zone, 'gcp') + def is_spot_restartable(self, resources: 'resources.Resources') -> bool: + """Returns whether a spot instance can be restarted after preemption.""" + # By default, GCP Compute VMs are restartable after preemption. + # "If ... not specified, then Compute Engine stops the VM, + # transitioning the VM to a TERMINATED state." + # See: https://cloud.google.com/compute/docs/instances/spot#preemption-process # pylint: disable=line-too-long + # However, Spot TPU VMs are not restartable after preemption. + # "If your Cloud TPU is preempted, + # you must delete it and create a new one ..." + # See: https://cloud.google.com/tpu/docs/preemptible#tpu-vm + + # pylint: disable=import-outside-toplevel + from sky.utils import tpu_utils + return not tpu_utils.is_tpu_vm(resources) + @classmethod def get_project_id(cls, dryrun: bool = False) -> str: # TODO(zhwu): change the project id fetching with the following command diff --git a/sky/resources.py b/sky/resources.py index 71754ec9ef7..b1c3b13d53b 100644 --- a/sky/resources.py +++ b/sky/resources.py @@ -260,6 +260,9 @@ def _set_accelerators( def is_launchable(self) -> bool: return self.cloud is not None and self._instance_type is not None + def is_spot_restartable(self) -> bool: + return self.cloud.is_spot_restartable(self) + def _set_region_zone(self, region: Optional[str], zone: Optional[str]) -> None: if region is None and zone is None: diff --git a/sky/spot/controller.py b/sky/spot/controller.py index ef9de5e1243..672507512cb 100644 --- a/sky/spot/controller.py +++ b/sky/spot/controller.py @@ -22,7 +22,6 @@ from sky.spot import spot_utils from sky.utils import common_utils from sky.utils import subprocess_utils -from sky.utils import tpu_utils logger = sky_logging.init_logger(__name__) @@ -146,11 +145,12 @@ def _run(self): 'cluster is healthy. Try to recover the job ' '(the cluster will not be restarted).') - # Clean up preempted TPU VM before recovering the cluster. - # This is needed as "status -r" may not remove it if GCP - # turns the VM state to other than PREEMPTED. - is_tpuvm = tpu_utils.is_tpu_vm(list(self._task.resources)[0]) - if is_tpuvm: + resources = list(self._task.resources)[0] + if not resources.is_spot_restartable(): + # If the resource is not restartable after preemption, + # we need to terminate the cluster before recovering it. + logger.info('Resource not restartable. Cleaning up ' + 'the cluster.') self._strategy_executor.terminate_cluster() # Try to recover the spot jobs, when the cluster is preempted diff --git a/sky/utils/tpu_utils.py b/sky/utils/tpu_utils.py index 70f2b05b117..72f89f9ed54 100644 --- a/sky/utils/tpu_utils.py +++ b/sky/utils/tpu_utils.py @@ -4,28 +4,29 @@ from sky import resources as resources_lib -def is_tpu(resources: resources_lib.Resources) -> bool: - if resources.accelerators is None: +def is_tpu(resources: Optional[resources_lib.Resources]) -> bool: + if resources is None or resources.accelerators is None: return False acc, _ = list(resources.accelerators.items())[0] return acc.startswith('tpu') -def is_tpu_vm(resources: resources_lib.Resources) -> bool: - if resources.accelerator_args is None: +def is_tpu_vm(resources: Optional[resources_lib.Resources]) -> bool: + if resources is None or resources.accelerator_args is None: return False return resources.accelerator_args.get('tpu_vm', False) -def is_tpu_vm_pod(resources: resources_lib.Resources) -> bool: - if not is_tpu_vm(resources): +def is_tpu_vm_pod(resources: Optional[resources_lib.Resources]) -> bool: + if resources is None or not is_tpu_vm(resources): return False acc, _ = list(resources.accelerators.items())[0] return acc not in ['tpu-v2-8', 'tpu-v3-8'] -def get_num_tpu_devices(resources: resources_lib.Resources) -> Optional[int]: - if not is_tpu(resources): +def get_num_tpu_devices( + resources: Optional[resources_lib.Resources]) -> Optional[int]: + if resources is None or not is_tpu(resources): return None acc, _ = list(resources.accelerators.items())[0] num_tpu_devices = int(int(acc.split('-')[2]) / 8) From b147c9ac755af7351b31d91ed5c7520bba2d07f4 Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Sat, 10 Dec 2022 13:31:26 -0800 Subject: [PATCH 06/16] comment --- sky/resources.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sky/resources.py b/sky/resources.py index b1c3b13d53b..8e570a39b8b 100644 --- a/sky/resources.py +++ b/sky/resources.py @@ -261,6 +261,7 @@ def is_launchable(self) -> bool: return self.cloud is not None and self._instance_type is not None def is_spot_restartable(self) -> bool: + """Returns whether the resource is restartable after preeemption.""" return self.cloud.is_spot_restartable(self) def _set_region_zone(self, region: Optional[str], From 94d32149a53b4a7e56d41223dbaa4f04d8e66b9b Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Mon, 12 Dec 2022 11:43:43 -0800 Subject: [PATCH 07/16] comments --- sky/spot/recovery_strategy.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index 3459cb2edf7..de6b8eeb213 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -276,10 +276,6 @@ def _launch(self, max_retry=3, raise_on_failure=True) -> Optional[float]: launched_resources.region) return launch_time - def terminate_cluster(self, max_retry: int = 3) -> None: - super().terminate_cluster(max_retry) - self._launched_cloud_region = None - def recover(self) -> float: # 1. Cancel the jobs and launch the cluster with the STOPPED status, # so that it will try on the current region first until timeout. @@ -313,7 +309,9 @@ def recover(self) -> float: return launched_time # Step 2 - logger.debug('Terminating unhealthy spot cluster.') + logger.debug('Terminating unhealthy spot cluster and ' + 'reset cloud region.') + self._launched_cloud_region = None self.terminate_cluster() # Step 3 From fdf1942bdf91d1bb08524c6a1277a9a53157855c Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Mon, 12 Dec 2022 11:59:41 -0800 Subject: [PATCH 08/16] rename --- sky/clouds/aws.py | 5 +++-- sky/clouds/azure.py | 5 +++-- sky/clouds/cloud.py | 5 +++-- sky/clouds/gcp.py | 5 +++-- sky/resources.py | 6 +++--- sky/spot/controller.py | 9 ++++----- 6 files changed, 19 insertions(+), 16 deletions(-) diff --git a/sky/clouds/aws.py b/sky/clouds/aws.py index d73a1c44af2..d7f5ad89812 100644 --- a/sky/clouds/aws.py +++ b/sky/clouds/aws.py @@ -352,8 +352,9 @@ def accelerator_in_region_or_zone(self, return service_catalog.accelerator_in_region_or_zone( accelerator, acc_count, region, zone, 'aws') - def is_spot_restartable(self, resources: 'resources_lib.Resources') -> bool: - """Returns whether a spot instance can be restarted after preemption.""" + def need_cleanup_after_preemption( + self, resources: 'resources_lib.Resources') -> bool: + """Returns whether a spot resource needs cleanup after preeemption.""" # By default, AWS Spot instances are not restartable after preemption. # "Terminate interrupted Spot Instances (this is the default behavior)" # See: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/interruption-behavior.html # pylint: disable=line-too-long diff --git a/sky/clouds/azure.py b/sky/clouds/azure.py index e448c20342e..e083141538d 100644 --- a/sky/clouds/azure.py +++ b/sky/clouds/azure.py @@ -305,8 +305,9 @@ def accelerator_in_region_or_zone(self, return service_catalog.accelerator_in_region_or_zone( accelerator, acc_count, region, zone, 'azure') - def is_spot_restartable(self, resources: 'resources.Resources') -> bool: - """Returns whether a spot instance can be restarted after preemption.""" + def need_cleanup_after_preemption(self, + resources: 'resources.Resources') -> bool: + """Returns whether a spot resource needs cleanup after preeemption.""" # By default, Azure Spot instances are restartable after preemption. # "When creating an Azure Spot Virtual Machine, you can set # the eviction policy to Deallocate (default) or Delete." diff --git a/sky/clouds/cloud.py b/sky/clouds/cloud.py index eb26515c94c..6542b7298c0 100644 --- a/sky/clouds/cloud.py +++ b/sky/clouds/cloud.py @@ -214,8 +214,9 @@ def accelerator_in_region_or_zone(self, """Returns whether the accelerator is valid in the region or zone.""" raise NotImplementedError - def is_spot_restartable(self, resource: 'resources.Resources') -> bool: - """Returns whether a spot instance can be restarted after preemption.""" + def need_cleanup_after_preemption(self, + resource: 'resources.Resources') -> bool: + """Returns whether a spot resource needs cleanup after preeemption.""" raise NotImplementedError def __repr__(self): diff --git a/sky/clouds/gcp.py b/sky/clouds/gcp.py index a6b159f2c33..40c2c0a88ae 100644 --- a/sky/clouds/gcp.py +++ b/sky/clouds/gcp.py @@ -468,8 +468,9 @@ def accelerator_in_region_or_zone(self, return service_catalog.accelerator_in_region_or_zone( accelerator, acc_count, region, zone, 'gcp') - def is_spot_restartable(self, resources: 'resources.Resources') -> bool: - """Returns whether a spot instance can be restarted after preemption.""" + def need_cleanup_after_preemption(self, + resources: 'resources.Resources') -> bool: + """Returns whether a spot resource needs cleanup after preeemption.""" # By default, GCP Compute VMs are restartable after preemption. # "If ... not specified, then Compute Engine stops the VM, # transitioning the VM to a TERMINATED state." diff --git a/sky/resources.py b/sky/resources.py index 8e570a39b8b..e112cf700db 100644 --- a/sky/resources.py +++ b/sky/resources.py @@ -260,9 +260,9 @@ def _set_accelerators( def is_launchable(self) -> bool: return self.cloud is not None and self._instance_type is not None - def is_spot_restartable(self) -> bool: - """Returns whether the resource is restartable after preeemption.""" - return self.cloud.is_spot_restartable(self) + def need_cleanup_after_preemption(self) -> bool: + """Returns whether a spot resource needs cleanup after preeemption.""" + return self.cloud.need_cleanup_after_preemption(self) def _set_region_zone(self, region: Optional[str], zone: Optional[str]) -> None: diff --git a/sky/spot/controller.py b/sky/spot/controller.py index 672507512cb..a61d338f304 100644 --- a/sky/spot/controller.py +++ b/sky/spot/controller.py @@ -146,11 +146,10 @@ def _run(self): '(the cluster will not be restarted).') resources = list(self._task.resources)[0] - if not resources.is_spot_restartable(): - # If the resource is not restartable after preemption, - # we need to terminate the cluster before recovering it. - logger.info('Resource not restartable. Cleaning up ' - 'the cluster.') + if not resources.need_cleanup_after_preemption(): + # Some spot resource may need to be cleaned up after + # preemption, if the resource is not reusable. + logger.info('Cleaning up the preempted spot cluster...') self._strategy_executor.terminate_cluster() # Try to recover the spot jobs, when the cluster is preempted From b8ed50ddac787daca6885f132ffe917ef925457f Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Tue, 13 Dec 2022 19:00:08 -0800 Subject: [PATCH 09/16] comments --- sky/backends/backend_utils.py | 34 ++++++++++++---------------------- sky/clouds/aws.py | 9 --------- sky/clouds/azure.py | 10 ---------- sky/clouds/cloud.py | 10 ++++++++-- sky/clouds/gcp.py | 6 +----- 5 files changed, 21 insertions(+), 48 deletions(-) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 46f2934ea79..ed097980f73 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1233,22 +1233,17 @@ def _get_tpu_vm_pod_ips(ray_config: Dict[str, Any], shell=True, stream_logs=False, require_outputs=True) - if returncode != 0: - failure_massage = ('Failed to run gcloud to get TPU VM IDs.\n' - '**** STDOUT ****\n' - f'{stdout}\n' - '**** STDERR ****\n' - f'{stderr}\n' - '**** CMD ****\n' - f'{query_cmd}\n') - with ux_utils.print_exception_no_traceback(): - raise RuntimeError(failure_massage) + subprocess_utils.handle_returncode( + returncode, + query_cmd, + 'Failed to run gcloud to get TPU VM IDs.', + stderr=stdout + stderr) if len(stdout) == 0: logger.warning('No TPU VMs found with cluster name ' f'{cluster_name} in zone {zone}.') if len(stdout.splitlines()) > 1: - logger.warning('Found more than one TPU VM with cluster name ' - f'{cluster_name} in zone {zone}.') + logger.warning('Found more than one TPU VM/Pod with the same cluster ' + f'name {cluster_name} in zone {zone}.') all_ips = [] for tpu_id in stdout.splitlines(): @@ -1259,16 +1254,11 @@ def _get_tpu_vm_pod_ips(ray_config: Dict[str, Any], shell=True, stream_logs=False, require_outputs=True) - if returncode != 0: - failure_massage = ('Failed to run gcloud tpu-vm describe.\n' - '**** STDOUT ****\n' - f'{stdout}\n' - '**** STDERR ****\n' - f'{stderr}\n' - '**** CMD ****\n' - f'{tpuvm_cmd}\n') - with ux_utils.print_exception_no_traceback(): - raise RuntimeError(failure_massage) + subprocess_utils.handle_returncode( + returncode, + tpuvm_cmd, + 'Failed to run gcloud tpu-vm describe.', + stderr=stdout + stderr) tpuvm_json = json.loads(stdout) if tpuvm_json['state'] != 'READY': diff --git a/sky/clouds/aws.py b/sky/clouds/aws.py index d7f5ad89812..4d1a5bdb7b6 100644 --- a/sky/clouds/aws.py +++ b/sky/clouds/aws.py @@ -351,12 +351,3 @@ def accelerator_in_region_or_zone(self, zone: Optional[str] = None) -> bool: return service_catalog.accelerator_in_region_or_zone( accelerator, acc_count, region, zone, 'aws') - - def need_cleanup_after_preemption( - self, resources: 'resources_lib.Resources') -> bool: - """Returns whether a spot resource needs cleanup after preeemption.""" - # By default, AWS Spot instances are not restartable after preemption. - # "Terminate interrupted Spot Instances (this is the default behavior)" - # See: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/interruption-behavior.html # pylint: disable=line-too-long - del resources # unused - return False diff --git a/sky/clouds/azure.py b/sky/clouds/azure.py index e083141538d..8c4adc744e3 100644 --- a/sky/clouds/azure.py +++ b/sky/clouds/azure.py @@ -305,16 +305,6 @@ def accelerator_in_region_or_zone(self, return service_catalog.accelerator_in_region_or_zone( accelerator, acc_count, region, zone, 'azure') - def need_cleanup_after_preemption(self, - resources: 'resources.Resources') -> bool: - """Returns whether a spot resource needs cleanup after preeemption.""" - # By default, Azure Spot instances are restartable after preemption. - # "When creating an Azure Spot Virtual Machine, you can set - # the eviction policy to Deallocate (default) or Delete." - # See: https://learn.microsoft.com/en-us/azure/virtual-machines/spot-vms#eviction-policy # pylint: disable=line-too-long - del resources # Unused. - return True - @classmethod def get_project_id(cls, dryrun: bool = False) -> str: if dryrun: diff --git a/sky/clouds/cloud.py b/sky/clouds/cloud.py index 6542b7298c0..63f97e41966 100644 --- a/sky/clouds/cloud.py +++ b/sky/clouds/cloud.py @@ -216,8 +216,14 @@ def accelerator_in_region_or_zone(self, def need_cleanup_after_preemption(self, resource: 'resources.Resources') -> bool: - """Returns whether a spot resource needs cleanup after preeemption.""" - raise NotImplementedError + """Returns whether a spot resource needs cleanup after preeemption. + + In most cases, spot resources do not need cleanup after preemption. + The only exception by far is GCP's Spot TPU VM. We override this method + in gcp.py. + """ + del resource + return False def __repr__(self): return self._REPR diff --git a/sky/clouds/gcp.py b/sky/clouds/gcp.py index 40c2c0a88ae..36514fed98d 100644 --- a/sky/clouds/gcp.py +++ b/sky/clouds/gcp.py @@ -471,11 +471,7 @@ def accelerator_in_region_or_zone(self, def need_cleanup_after_preemption(self, resources: 'resources.Resources') -> bool: """Returns whether a spot resource needs cleanup after preeemption.""" - # By default, GCP Compute VMs are restartable after preemption. - # "If ... not specified, then Compute Engine stops the VM, - # transitioning the VM to a TERMINATED state." - # See: https://cloud.google.com/compute/docs/instances/spot#preemption-process # pylint: disable=line-too-long - # However, Spot TPU VMs are not restartable after preemption. + # Spot TPU VMs require manual cleanup after preemption. # "If your Cloud TPU is preempted, # you must delete it and create a new one ..." # See: https://cloud.google.com/tpu/docs/preemptible#tpu-vm From e8cca2a2929de43d422e060937553d989a418705 Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Tue, 13 Dec 2022 19:29:58 -0800 Subject: [PATCH 10/16] comment --- sky/backends/cloud_vm_ray_backend.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 85700dc3143..d6276a691d1 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2657,7 +2657,6 @@ def teardown_no_lock(self, elif (terminate and (prev_status == global_user_state.ClusterStatus.STOPPED or use_tpu_vm)): - terminate_cmds = [] # For TPU VMs, gcloud CLI is used for VM termination. if isinstance(cloud, clouds.AWS): # TODO(zhwu): Room for optimization. We can move these cloud @@ -2670,7 +2669,7 @@ def teardown_no_lock(self, f'Name=tag:ray-cluster-name,Values={handle.cluster_name} ' f'--query Reservations[].Instances[].InstanceId ' '--output text') - terminate_cmds.append( + terminate_cmd = ( f'aws ec2 terminate-instances --region {region} ' f'--instance-ids $({query_cmd})') elif isinstance(cloud, clouds.GCP): @@ -2694,16 +2693,18 @@ def teardown_no_lock(self, # Needs to create a list as GCP does not allow deleting # multiple TPU VMs at once + tpu_terminate_cmds = [] for tpu_id in stdout.splitlines(): - terminate_cmds.append( + tpu_terminate_cmds.append( f'gcloud compute tpus tpu-vm delete --zone={zone} ' f'--quiet {tpu_id}') + terminate_cmd = ' && '.join(tpu_terminate_cmds) else: query_cmd = ( f'gcloud compute instances list --filter=' f'\\(labels.ray-cluster-name={cluster_name}\\) ' f'--zones={zone} --format=value\\(name\\)') - terminate_cmds.append( + terminate_cmd = ( f'gcloud compute instances delete --zone={zone}' f' --quiet $({query_cmd})') else: @@ -2712,13 +2713,12 @@ def teardown_no_lock(self, f'cluster {cluster_name!r}.') with backend_utils.safe_console_status(f'[bold cyan]Terminating ' f'[green]{cluster_name}'): - for terminate_cmd in terminate_cmds: - returncode, stdout, stderr = log_lib.run_with_log( - terminate_cmd, - log_abs_path, - shell=True, - stream_logs=False, - require_outputs=True) + returncode, stdout, stderr = log_lib.run_with_log( + terminate_cmd, + log_abs_path, + shell=True, + stream_logs=False, + require_outputs=True) else: config['provider']['cache_stopped_nodes'] = not terminate with tempfile.NamedTemporaryFile('w', From 464336fa9d3f469d9a2ea5658ab3b39835b876d5 Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Tue, 13 Dec 2022 22:05:47 -0800 Subject: [PATCH 11/16] msg --- sky/backends/backend_utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index ed097980f73..5e626ca97d5 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1239,9 +1239,10 @@ def _get_tpu_vm_pod_ips(ray_config: Dict[str, Any], 'Failed to run gcloud to get TPU VM IDs.', stderr=stdout + stderr) if len(stdout) == 0: - logger.warning('No TPU VMs found with cluster name ' - f'{cluster_name} in zone {zone}.') + logger.debug('No TPU VMs found with cluster name ' + f'{cluster_name} in zone {zone}.') if len(stdout.splitlines()) > 1: + # Rare case, this could mean resource leakage. Hint user. logger.warning('Found more than one TPU VM/Pod with the same cluster ' f'name {cluster_name} in zone {zone}.') From 5daf3788723f4df5cb3d0dfa03084eb7502b922e Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Tue, 13 Dec 2022 22:17:17 -0800 Subject: [PATCH 12/16] comment --- sky/backends/cloud_vm_ray_backend.py | 7 +++++-- sky/clouds/cloud.py | 4 +++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index d6276a691d1..18419b44e2f 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2692,8 +2692,11 @@ def teardown_no_lock(self, require_outputs=True) # Needs to create a list as GCP does not allow deleting - # multiple TPU VMs at once - tpu_terminate_cmds = [] + # multiple TPU VMs at once. + # Skip the termination commands, if the TPU ID + # query command fails. + tpu_terminate_cmds = [f'exit {returncode}' + ] if returncode != 0 else [] for tpu_id in stdout.splitlines(): tpu_terminate_cmds.append( f'gcloud compute tpus tpu-vm delete --zone={zone} ' diff --git a/sky/clouds/cloud.py b/sky/clouds/cloud.py index 63f97e41966..6eb856f526e 100644 --- a/sky/clouds/cloud.py +++ b/sky/clouds/cloud.py @@ -218,7 +218,9 @@ def need_cleanup_after_preemption(self, resource: 'resources.Resources') -> bool: """Returns whether a spot resource needs cleanup after preeemption. - In most cases, spot resources do not need cleanup after preemption. + In most cases, spot resources do not need cleanup after preemption, + as long as the cluster can be relaunched with the same name and tag, + no matter the preemption behavior is to terminate or stop the cluster. The only exception by far is GCP's Spot TPU VM. We override this method in gcp.py. """ From a1536c23bd378ce7e8d16d8baa08f857ff128e63 Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Tue, 13 Dec 2022 22:23:31 -0800 Subject: [PATCH 13/16] bug.. --- sky/spot/controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/spot/controller.py b/sky/spot/controller.py index a61d338f304..ab885b3bce1 100644 --- a/sky/spot/controller.py +++ b/sky/spot/controller.py @@ -146,7 +146,7 @@ def _run(self): '(the cluster will not be restarted).') resources = list(self._task.resources)[0] - if not resources.need_cleanup_after_preemption(): + if resources.need_cleanup_after_preemption(): # Some spot resource may need to be cleaned up after # preemption, if the resource is not reusable. logger.info('Cleaning up the preempted spot cluster...') From bb34e27068077e96aad89c89918e64441420663b Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Tue, 13 Dec 2022 22:24:34 -0800 Subject: [PATCH 14/16] msg --- sky/spot/controller.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sky/spot/controller.py b/sky/spot/controller.py index ab885b3bce1..7a59e4900eb 100644 --- a/sky/spot/controller.py +++ b/sky/spot/controller.py @@ -147,8 +147,8 @@ def _run(self): resources = list(self._task.resources)[0] if resources.need_cleanup_after_preemption(): - # Some spot resource may need to be cleaned up after - # preemption, if the resource is not reusable. + # Some spot resource (e.g., Spot TPU VM) may need to be + # cleaned up after preemption. logger.info('Cleaning up the preempted spot cluster...') self._strategy_executor.terminate_cluster() From 6e03445158a58e16926e9110bcfbdfb215c40c45 Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Tue, 13 Dec 2022 22:26:21 -0800 Subject: [PATCH 15/16] miss one place --- sky/clouds/gcp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/clouds/gcp.py b/sky/clouds/gcp.py index 36514fed98d..fffd1c9a5ed 100644 --- a/sky/clouds/gcp.py +++ b/sky/clouds/gcp.py @@ -478,7 +478,7 @@ def need_cleanup_after_preemption(self, # pylint: disable=import-outside-toplevel from sky.utils import tpu_utils - return not tpu_utils.is_tpu_vm(resources) + return tpu_utils.is_tpu_vm(resources) @classmethod def get_project_id(cls, dryrun: bool = False) -> str: From 9fee56983e0b9e1b9c0ad83c4f659ad2821206d0 Mon Sep 17 00:00:00 2001 From: Wei-Lin Chiang Date: Tue, 13 Dec 2022 22:41:39 -0800 Subject: [PATCH 16/16] output error msg --- sky/backends/cloud_vm_ray_backend.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 18419b44e2f..a3c7371da56 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2691,17 +2691,22 @@ def teardown_no_lock(self, stream_logs=False, require_outputs=True) - # Needs to create a list as GCP does not allow deleting - # multiple TPU VMs at once. - # Skip the termination commands, if the TPU ID + # Skip the termination command, if the TPU ID # query command fails. - tpu_terminate_cmds = [f'exit {returncode}' - ] if returncode != 0 else [] - for tpu_id in stdout.splitlines(): - tpu_terminate_cmds.append( - f'gcloud compute tpus tpu-vm delete --zone={zone} ' - f'--quiet {tpu_id}') - terminate_cmd = ' && '.join(tpu_terminate_cmds) + if returncode != 0: + terminate_cmd = (f'echo "cmd: {query_cmd}" && ' + f'echo "{stdout}" && ' + f'echo "{stderr}" >&2 && ' + f'exit {returncode}') + else: + # Needs to create a list as GCP does not allow deleting + # multiple TPU VMs at once. + tpu_terminate_cmds = [] + for tpu_id in stdout.splitlines(): + tpu_terminate_cmds.append( + 'gcloud compute tpus tpu-vm delete ' + f'--zone={zone} --quiet {tpu_id}') + terminate_cmd = ' && '.join(tpu_terminate_cmds) else: query_cmd = ( f'gcloud compute instances list --filter='