From 4318efe1000785d6fa52432d76eb5fb17cfe5658 Mon Sep 17 00:00:00 2001 From: Isaac Ong Date: Wed, 30 Nov 2022 22:16:29 -0800 Subject: [PATCH 01/11] Better handling of old clusters --- sky/backends/backend_utils.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 3e00fcd3aac..85a6e313e5a 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1693,8 +1693,10 @@ def _update_cluster_status_no_lock( # that the cluster is partially preempted. # TODO(zhwu): the definition of INIT should be audited/changed. # Adding a new status UNHEALTHY for abnormal status can be a choice. - global_user_state.set_cluster_status( - cluster_name, global_user_state.ClusterStatus.INIT) + global_user_state.add_or_update_cluster(cluster_name, + handle, + ready=False, + is_launch=False) return global_user_state.get_cluster_from_name(cluster_name) # Now is_abnormal is False: either node_statuses is empty or all nodes are STOPPED. backend = backends.CloudVmRayBackend() From 19c6a2a642d8396b881a26c560248a94bd2919f7 Mon Sep 17 00:00:00 2001 From: Isaac Ong Date: Wed, 30 Nov 2022 22:16:47 -0800 Subject: [PATCH 02/11] Handle IP functions should always return list --- sky/backends/cloud_vm_ray_backend.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 1076de90028..0b92de9020c 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -1709,21 +1709,21 @@ def _update_stable_cluster_ips(self, def internal_ips(self, max_attempts: int = 1, - use_cached_ips: bool = True): + use_cached_ips: bool = True) -> List[str]: if not use_cached_ips: self._update_stable_cluster_ips(max_attempts=max_attempts) if self.stable_internal_external_ips is not None: return [ips[0] for ips in self.stable_internal_external_ips] - return None + return [] def external_ips(self, max_attempts: int = 1, - use_cached_ips: bool = True): + use_cached_ips: bool = True) -> List[str]: if not use_cached_ips: self._update_stable_cluster_ips(max_attempts=max_attempts) if self.stable_internal_external_ips is not None: return [ips[1] for ips in self.stable_internal_external_ips] - return None + return [] @property def cluster_yaml(self): From b025c01a4819e4191b48f4f5e9e1e00fcffbb769 Mon Sep 17 00:00:00 2001 From: Isaac Ong Date: Wed, 30 Nov 2022 23:24:46 -0800 Subject: [PATCH 03/11] Fix head ip call --- sky/backends/cloud_vm_ray_backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 0b92de9020c..66ceed61252 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -1732,7 +1732,7 @@ def cluster_yaml(self): @property def head_ip(self): external_ips = self.external_ips() - if external_ips is not None: + if external_ips: return external_ips[0] return None From 6782ab5b775c6c37dbdbf00e2dd1172431c29e85 Mon Sep 17 00:00:00 2001 From: Isaac Ong Date: Wed, 30 Nov 2022 23:25:46 -0800 Subject: [PATCH 04/11] Fix TPU pod handling --- sky/backends/backend_utils.py | 4 +++- sky/backends/cloud_vm_ray_backend.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 85a6e313e5a..4fbe8c236c3 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -48,6 +48,7 @@ from sky.utils import command_runner from sky.utils import subprocess_utils from sky.utils import timeline +from sky.utils import tpu_utils from sky.utils import ux_utils from sky.utils import validator from sky.usage import usage_lib @@ -1154,8 +1155,9 @@ def get_node_ips(cluster_yaml: str, if use_tpu_vm: ips = _get_tpu_vm_pod_ips(ray_config, get_internal_ips) assert expected_num_nodes == 1, 'TPU VM only supports single node for now.' - if len(ips) != expected_num_nodes: + if len(ips) != tpu_utils.get_num_tpu_devices(handle.launched_resources): raise exceptions.FetchIPError(exceptions.FetchIPError.Reason.HEAD) + return ips if get_internal_ips: with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 66ceed61252..6c2abb3aa66 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -1155,7 +1155,7 @@ def _tpu_pod_setup(self, cluster_yaml: str, run setup or launch ray cluster on TPU VM Pod nodes. """ ssh_credentials = backend_utils.ssh_credential_from_yaml(cluster_yaml) - all_ips = cluster_handle.external_ips() + all_ips = cluster_handle.external_ips(use_cached_ips=False) num_tpu_devices = tpu_utils.get_num_tpu_devices( cluster_handle.launched_resources) if len(all_ips) != num_tpu_devices: From 489aae1c0fd685b2f0f5262ed77475e959787910 Mon Sep 17 00:00:00 2001 From: Isaac Ong Date: Wed, 30 Nov 2022 23:40:10 -0800 Subject: [PATCH 05/11] Move assertion to start --- sky/backends/backend_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 4fbe8c236c3..e3a4f6917a9 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1153,8 +1153,8 @@ def get_node_ips(cluster_yaml: str, ray_config = common_utils.read_yaml(cluster_yaml) use_tpu_vm = ray_config['provider'].get('_has_tpus', False) if use_tpu_vm: - ips = _get_tpu_vm_pod_ips(ray_config, get_internal_ips) assert expected_num_nodes == 1, 'TPU VM only supports single node for now.' + ips = _get_tpu_vm_pod_ips(ray_config, get_internal_ips) if len(ips) != tpu_utils.get_num_tpu_devices(handle.launched_resources): raise exceptions.FetchIPError(exceptions.FetchIPError.Reason.HEAD) return ips From 9ee5917e337a8e16ca94360098ed4226d1a5f654 Mon Sep 17 00:00:00 2001 From: Isaac Ong Date: Thu, 1 Dec 2022 00:05:07 -0800 Subject: [PATCH 06/11] Update handling of IPs --- sky/backends/backend_utils.py | 2 +- sky/backends/cloud_vm_ray_backend.py | 26 ++++++++++++++++++-------- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index e3a4f6917a9..1cedd7a1d24 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1627,7 +1627,7 @@ def _update_cluster_status_no_lock( # in ray's get IPs vs. ray runtime failing. external_ips = handle.external_ips(use_cached_ips=False) # This happens to a stopped TPU VM as we use gcloud to query the IP. - if len(external_ips) == 0: + if external_ips is None or len(external_ips) == 0: raise exceptions.FetchIPError( reason=exceptions.FetchIPError.Reason.HEAD) if handle.launched_nodes == 1: diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 6c2abb3aa66..290d6a1a31a 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -1158,9 +1158,9 @@ def _tpu_pod_setup(self, cluster_yaml: str, all_ips = cluster_handle.external_ips(use_cached_ips=False) num_tpu_devices = tpu_utils.get_num_tpu_devices( cluster_handle.launched_resources) - if len(all_ips) != num_tpu_devices: + if all_ips is None or len(all_ips) != num_tpu_devices: raise RuntimeError( - f'Number of nodes IPs: {len(all_ips)} does not' + f'Nodes IPs: {all_ips} does not' f'match number of TPU devices: {num_tpu_devices}.') # Get the private IP of head node for connecting Ray cluster. @@ -1714,7 +1714,7 @@ def internal_ips(self, self._update_stable_cluster_ips(max_attempts=max_attempts) if self.stable_internal_external_ips is not None: return [ips[0] for ips in self.stable_internal_external_ips] - return [] + return None def external_ips(self, max_attempts: int = 1, @@ -1723,7 +1723,7 @@ def external_ips(self, self._update_stable_cluster_ips(max_attempts=max_attempts) if self.stable_internal_external_ips is not None: return [ips[1] for ips in self.stable_internal_external_ips] - return [] + return None @property def cluster_yaml(self): @@ -1732,7 +1732,7 @@ def cluster_yaml(self): @property def head_ip(self): external_ips = self.external_ips() - if external_ips: + if external_ips is not None: return external_ips[0] return None @@ -2046,6 +2046,7 @@ def _sync_workdir(self, handle: ResourceHandle, workdir: Path) -> None: fore = colorama.Fore style = colorama.Style ip_list = handle.external_ips() + assert ip_list is not None, 'external_ips is not cached in the handle' full_workdir = os.path.abspath(os.path.expanduser(workdir)) # These asserts have been validated at Task construction time. @@ -2125,7 +2126,8 @@ def _setup(self, handle: ResourceHandle, task: task_lib.Task, setup_sh_path = f.name setup_file = os.path.basename(setup_sh_path) # Sync the setup script up and run it. - ip_list = handle.external_ips(max_attempts=_FETCH_IP_MAX_ATTEMPTS) + ip_list = handle.external_ips() + assert ip_list is not None, 'external_ips is not cached in the handle' ssh_credentials = backend_utils.ssh_credential_from_yaml( handle.cluster_yaml) # Disable connection sharing for setup script to avoid old @@ -2522,6 +2524,7 @@ def sync_down_logs( f'{style.RESET_ALL}') ip_list = handle.external_ips() + assert ip_list is not None, 'external_ips is not cached in the handle' ssh_credentials = backend_utils.ssh_credential_from_yaml( handle.cluster_yaml) runners = command_runner.SSHCommandRunner.make_runner_list( @@ -2931,6 +2934,7 @@ def _check_existing_cluster( def _set_tpu_name(self, handle: ResourceHandle, tpu_name: str) -> None: """Sets TPU_NAME on all nodes.""" ip_list = handle.external_ips() + assert ip_list is not None, 'external_ips is not cached in the handle' ssh_credentials = backend_utils.ssh_credential_from_yaml( handle.cluster_yaml) @@ -2964,6 +2968,7 @@ def _execute_file_mounts(self, handle: ResourceHandle, logger.info(f'{fore.CYAN}Processing file mounts.{style.RESET_ALL}') start = time.time() ip_list = handle.external_ips() + assert ip_list is not None, 'external_ips is not cached in the handle' ssh_credentials = backend_utils.ssh_credential_from_yaml( handle.cluster_yaml) runners = command_runner.SSHCommandRunner.make_runner_list( @@ -3108,6 +3113,7 @@ def _execute_storage_mounts(self, handle: ResourceHandle, f'storage mount{plural}.{style.RESET_ALL}') start = time.time() ip_list = handle.external_ips() + assert ip_list is not None, 'external_ips is not cached in the handle' ssh_credentials = backend_utils.ssh_credential_from_yaml( handle.cluster_yaml) runners = command_runner.SSHCommandRunner.make_runner_list( @@ -3141,6 +3147,8 @@ def _execute_task_one_node(self, handle: ResourceHandle, log_dir = os.path.join(self.log_dir, 'tasks') accelerator_dict = backend_utils.get_task_demands_dict(task) + internal_ips = handle.internal_ips() + assert internal_ips is not None, 'internal_ips is not cached in the handle' codegen = RayCodeGen() is_local = isinstance(handle.launched_resources.cloud, clouds.Local) @@ -3153,7 +3161,7 @@ def _execute_task_one_node(self, handle: ResourceHandle, codegen.add_gang_scheduling_placement_group( 1, accelerator_dict, - stable_cluster_internal_ips=handle.internal_ips()) + stable_cluster_internal_ips=internal_ips) if callable(task.run): run_fn_code = textwrap.dedent(inspect.getsource(task.run)) @@ -3196,6 +3204,8 @@ def _execute_task_n_nodes(self, handle: ResourceHandle, task: task_lib.Task, log_dir_base = self.log_dir log_dir = os.path.join(log_dir_base, 'tasks') accelerator_dict = backend_utils.get_task_demands_dict(task) + internal_ips = handle.internal_ips() + assert internal_ips is not None, 'internal_ips is not cached in the handle' # If TPU VM Pods is used, #num_nodes should be #num_tpu_devices is_tpu_vm_pod = tpu_utils.is_tpu_vm_pod(handle.launched_resources) @@ -3216,7 +3226,7 @@ def _execute_task_n_nodes(self, handle: ResourceHandle, task: task_lib.Task, codegen.add_gang_scheduling_placement_group( num_actual_nodes, accelerator_dict, - stable_cluster_internal_ips=handle.internal_ips()) + stable_cluster_internal_ips=internal_ips) if callable(task.run): run_fn_code = textwrap.dedent(inspect.getsource(task.run)) From 3caea513188b7787e950ee6941160f4796ee99e5 Mon Sep 17 00:00:00 2001 From: Isaac Ong Date: Thu, 1 Dec 2022 00:05:17 -0800 Subject: [PATCH 07/11] Always run tpu_vm_pod test --- tests/test_smoke.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index af59fd3870e..db1fce377e2 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -524,7 +524,6 @@ def test_tpu_vm(): # ---------- TPU VM Pod. ---------- # Mark slow because it's expensive to run. -@pytest.mark.slow def test_tpu_vm_pod(): name = _get_cluster_name() test = Test( From 24a9fafdd44bcb476f253df0a2cec35560b992cd Mon Sep 17 00:00:00 2001 From: Isaac Ong Date: Thu, 1 Dec 2022 00:06:27 -0800 Subject: [PATCH 08/11] Update types --- sky/backends/cloud_vm_ray_backend.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 290d6a1a31a..f90b8ae47a5 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -1709,7 +1709,7 @@ def _update_stable_cluster_ips(self, def internal_ips(self, max_attempts: int = 1, - use_cached_ips: bool = True) -> List[str]: + use_cached_ips: bool = True) -> Optional[List[str]]: if not use_cached_ips: self._update_stable_cluster_ips(max_attempts=max_attempts) if self.stable_internal_external_ips is not None: @@ -1718,7 +1718,7 @@ def internal_ips(self, def external_ips(self, max_attempts: int = 1, - use_cached_ips: bool = True) -> List[str]: + use_cached_ips: bool = True) -> Optional[List[str]]: if not use_cached_ips: self._update_stable_cluster_ips(max_attempts=max_attempts) if self.stable_internal_external_ips is not None: @@ -3159,9 +3159,7 @@ def _execute_task_one_node(self, handle: ResourceHandle, setup_log_path=os.path.join(log_dir, 'setup.log'), is_local=is_local) codegen.add_gang_scheduling_placement_group( - 1, - accelerator_dict, - stable_cluster_internal_ips=internal_ips) + 1, accelerator_dict, stable_cluster_internal_ips=internal_ips) if callable(task.run): run_fn_code = textwrap.dedent(inspect.getsource(task.run)) From 063f8fb792cabb199b4e381c3b54db72fd656434 Mon Sep 17 00:00:00 2001 From: Isaac Ong Date: Thu, 1 Dec 2022 00:07:44 -0800 Subject: [PATCH 09/11] Fix linting --- sky/backends/cloud_vm_ray_backend.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index f90b8ae47a5..d85d4d1148e 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2046,7 +2046,7 @@ def _sync_workdir(self, handle: ResourceHandle, workdir: Path) -> None: fore = colorama.Fore style = colorama.Style ip_list = handle.external_ips() - assert ip_list is not None, 'external_ips is not cached in the handle' + assert ip_list is not None, 'external_ips is not cached in handle' full_workdir = os.path.abspath(os.path.expanduser(workdir)) # These asserts have been validated at Task construction time. @@ -2127,7 +2127,7 @@ def _setup(self, handle: ResourceHandle, task: task_lib.Task, setup_file = os.path.basename(setup_sh_path) # Sync the setup script up and run it. ip_list = handle.external_ips() - assert ip_list is not None, 'external_ips is not cached in the handle' + assert ip_list is not None, 'external_ips is not cached in handle' ssh_credentials = backend_utils.ssh_credential_from_yaml( handle.cluster_yaml) # Disable connection sharing for setup script to avoid old @@ -2524,7 +2524,7 @@ def sync_down_logs( f'{style.RESET_ALL}') ip_list = handle.external_ips() - assert ip_list is not None, 'external_ips is not cached in the handle' + assert ip_list is not None, 'external_ips is not cached in handle' ssh_credentials = backend_utils.ssh_credential_from_yaml( handle.cluster_yaml) runners = command_runner.SSHCommandRunner.make_runner_list( @@ -2934,7 +2934,7 @@ def _check_existing_cluster( def _set_tpu_name(self, handle: ResourceHandle, tpu_name: str) -> None: """Sets TPU_NAME on all nodes.""" ip_list = handle.external_ips() - assert ip_list is not None, 'external_ips is not cached in the handle' + assert ip_list is not None, 'external_ips is not cached in handle' ssh_credentials = backend_utils.ssh_credential_from_yaml( handle.cluster_yaml) @@ -2968,7 +2968,7 @@ def _execute_file_mounts(self, handle: ResourceHandle, logger.info(f'{fore.CYAN}Processing file mounts.{style.RESET_ALL}') start = time.time() ip_list = handle.external_ips() - assert ip_list is not None, 'external_ips is not cached in the handle' + assert ip_list is not None, 'external_ips is not cached in handle' ssh_credentials = backend_utils.ssh_credential_from_yaml( handle.cluster_yaml) runners = command_runner.SSHCommandRunner.make_runner_list( @@ -3113,7 +3113,7 @@ def _execute_storage_mounts(self, handle: ResourceHandle, f'storage mount{plural}.{style.RESET_ALL}') start = time.time() ip_list = handle.external_ips() - assert ip_list is not None, 'external_ips is not cached in the handle' + assert ip_list is not None, 'external_ips is not cached in handle' ssh_credentials = backend_utils.ssh_credential_from_yaml( handle.cluster_yaml) runners = command_runner.SSHCommandRunner.make_runner_list( @@ -3148,7 +3148,7 @@ def _execute_task_one_node(self, handle: ResourceHandle, accelerator_dict = backend_utils.get_task_demands_dict(task) internal_ips = handle.internal_ips() - assert internal_ips is not None, 'internal_ips is not cached in the handle' + assert internal_ips is not None, 'internal_ips is not cached in handle' codegen = RayCodeGen() is_local = isinstance(handle.launched_resources.cloud, clouds.Local) @@ -3203,7 +3203,7 @@ def _execute_task_n_nodes(self, handle: ResourceHandle, task: task_lib.Task, log_dir = os.path.join(log_dir_base, 'tasks') accelerator_dict = backend_utils.get_task_demands_dict(task) internal_ips = handle.internal_ips() - assert internal_ips is not None, 'internal_ips is not cached in the handle' + assert internal_ips is not None, 'internal_ips is not cached in handle' # If TPU VM Pods is used, #num_nodes should be #num_tpu_devices is_tpu_vm_pod = tpu_utils.is_tpu_vm_pod(handle.launched_resources) From 3ec6ea0377ff157ab5b7c34a212fec0b0a890543 Mon Sep 17 00:00:00 2001 From: Isaac Ong Date: Thu, 1 Dec 2022 00:09:43 -0800 Subject: [PATCH 10/11] Remove smoke test comment --- tests/test_smoke.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index db1fce377e2..082311c6a5b 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -523,7 +523,6 @@ def test_tpu_vm(): # ---------- TPU VM Pod. ---------- -# Mark slow because it's expensive to run. def test_tpu_vm_pod(): name = _get_cluster_name() test = Test( From b3247d89466fdd388423ac9f779efb466e348643 Mon Sep 17 00:00:00 2001 From: Isaac Ong Date: Thu, 1 Dec 2022 00:47:41 -0800 Subject: [PATCH 11/11] Update tpu pod smoke test --- tests/test_smoke.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 082311c6a5b..42428a2b75a 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -528,7 +528,7 @@ def test_tpu_vm_pod(): test = Test( 'tpu_pod', [ - f'sky launch -y -c {name} examples/tpu/tpuvm_mnist.yaml --gpus tpu-v2-32', + f'sky launch -y -c {name} examples/tpu/tpuvm_mnist.yaml --gpus tpu-v2-32 --use-spot --zone europe-west4-a', f'sky logs {name} 1', # Ensure the job finished. f'sky logs {name} 1 --status', # Ensure the job succeeded. ],