From a1af159313a5f3a1ee537b0620e03f362b4559b8 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Sat, 5 Oct 2024 16:54:17 -0700 Subject: [PATCH 01/23] sky global status for kubernetes --- sky/cli.py | 21 ++++++- sky/provision/kubernetes/utils.py | 23 ++++++++ sky/utils/cli_utils/status_utils.py | 85 +++++++++++++++++++++++++++++ 3 files changed, 128 insertions(+), 1 deletion(-) diff --git a/sky/cli.py b/sky/cli.py index c538c99aeb3..e8d84226ef4 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1503,6 +1503,11 @@ def _get_services(service_names: Optional[List[str]], is_flag=True, required=False, help='Also show sky serve services, if any.') +@click.option('--kubernetes', + default=False, + is_flag=True, + required=False, + help='[Experimental] Show SkyPilot clusters from all users on Kubernetes.') @click.argument('clusters', required=False, type=str, @@ -1512,7 +1517,7 @@ def _get_services(service_names: Optional[List[str]], # pylint: disable=redefined-builtin def status(all: bool, refresh: bool, ip: bool, endpoints: bool, endpoint: Optional[int], show_managed_jobs: bool, - show_services: bool, clusters: List[str]): + show_services: bool, kubernetes: bool, clusters: List[str]): # NOTE(dev): Keep the docstring consistent between the Python API and CLI. """Show clusters. @@ -1571,6 +1576,20 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, or for autostop-enabled clusters, use ``--refresh`` to query the latest cluster statuses from the cloud providers. """ + if kubernetes: + click.echo( + f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}SkyPilot Clusters on Kubernetes' + f'{colorama.Style.RESET_ALL}') + + try: + pods = kubernetes_utils.get_skypilot_pods() + except Exception as e: + with ux_utils.print_exception_no_traceback(): + raise ValueError( + f'Failed to get SkyPilot pods from Kubernetes: {str(e)}') + + status_utils.show_kubernetes_status_table(pods, all) + return # Using a pool with 2 worker to run the managed job query and sky serve # service query in parallel to speed up. The pool provides a AsyncResult # object that can be used as a future. diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index 0498cc7f59f..bf22bdee419 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -1998,3 +1998,26 @@ def get_context_from_config(provider_config: Dict[str, Any]) -> Optional[str]: # we need to use in-cluster auth. context = None return context + +def get_skypilot_pods(context: Optional[str] = None) -> List[Any]: + """Gets all SkyPilot pods in the Kubernetes cluster. + + Args: + context: The Kubernetes context to use. If None, uses the current context. + + Returns: + A list of Kubernetes pod objects. + """ + if context is None: + context = get_current_kube_config_context_name() + + try: + pods = kubernetes.core_api(context).list_pod_for_all_namespaces( + label_selector='skypilot-cluster', + _request_timeout=kubernetes.API_TIMEOUT).items + except kubernetes.max_retry_error(): + raise exceptions.ResourcesUnavailableError( + 'Timed out when trying to get SkyPilot pod info from Kubernetes cluster. ' + 'Please check if the cluster is healthy and retry. To debug, run: ' + 'kubectl get pods --selector=skypilot-cluster --all-namespaces') from None + return pods diff --git a/sky/utils/cli_utils/status_utils.py b/sky/utils/cli_utils/status_utils.py index 3a783f03bb4..9bb576620df 100644 --- a/sky/utils/cli_utils/status_utils.py +++ b/sky/utils/cli_utils/status_utils.py @@ -6,9 +6,11 @@ from sky import backends from sky import status_lib +from sky import clouds from sky.skylet import constants from sky.utils import log_utils from sky.utils import resources_utils +from sky.provision.kubernetes import utils as kubernetes_utils COMMAND_TRUNC_LENGTH = 25 NUM_COST_REPORT_LINES = 5 @@ -316,3 +318,86 @@ def _get_estimated_cost_for_cost_report( return '-' return f'$ {cost:.2f}' + +def process_skypilot_pods(pods: List[Any]) -> List[Dict[str, Any]]: + """Process SkyPilot pods and group them by cluster.""" + # pylint: disable=import-outside-toplevel + from sky import resources as resources_lib + clusters = {} + for pod in pods: + cluster_name = pod.metadata.labels.get('skypilot-cluster') + if cluster_name not in clusters: + # Parse the earliest start time for the cluster + start_time = pod.status.start_time + if start_time: + # Parse to unix timestamp and convert to int + start_time = start_time.timestamp() + + # Parse resources + cpu_request = kubernetes_utils.parse_cpu_or_gpu_resource(pod.spec.containers[0].resources.requests.get('cpu', '0')) + memory_request = kubernetes_utils.parse_memory_resource(pod.spec.containers[0].resources.requests.get('memory', '0')) / 1073741824 # Convert to GiB + gpu_count = kubernetes_utils.parse_cpu_or_gpu_resource(pod.spec.containers[0].resources.requests.get('nvidia.com/gpu', '0')) + if gpu_count > 0: + # TODO: We should pass context here + context = None + label_formatter, _ = kubernetes_utils.detect_gpu_label_formatter(context) + gpu_label = label_formatter.get_label_key() + # Get GPU name from pod node selector + if pod.spec.node_selector is not None: + gpu_name = label_formatter.get_accelerator_from_label_value( + pod.spec.node_selector.get(gpu_label)) + + resources = resources_lib.Resources( + cloud=clouds.Kubernetes(), + cpus=float(cpu_request), + memory=memory_request, + accelerators=(f'{gpu_name}:{gpu_count}' if gpu_count > 0 else None) + ) + + clusters[cluster_name] = { + 'name': cluster_name, + 'user': pod.metadata.labels.get('skypilot-user'), + 'status': status_lib.ClusterStatus.UP, # Assuming UP if pod exists + 'pods': [], + 'launched_at': start_time, + 'resources': resources, + 'resources_str': f'{len(pods)}x {resources}', + } + else: + # Update start_time if this pod started earlier + pod_start_time = pod.status.start_time + if pod_start_time: + pod_start_time = pod_start_time.replace(tzinfo=None) + if pod_start_time < clusters[cluster_name]['launched_at']: + clusters[cluster_name]['launched_at'] = pod_start_time + + clusters[cluster_name]['pods'].append(pod) + return list(clusters.values()) + +def show_kubernetes_status_table(pods: List[Any], show_all: bool) -> None: + """Compute cluster table values and display for Kubernetes clusters.""" + status_columns = [ + StatusColumn('NAME', lambda c: c['name']), + StatusColumn('USER', lambda c: c['user']), + StatusColumn('LAUNCHED', lambda c: log_utils.readable_time_duration(c['launched_at'])), + StatusColumn('RESOURCES', lambda c: c['resources_str'], trunc_length=70 if not show_all else 0), + # StatusColumn('PODS', lambda c: len(c['pods'])), + StatusColumn('STATUS', lambda c: c['status'].colored_str()), + # Add more columns as needed + ] + + clusters = process_skypilot_pods(pods) + columns = [col.name for col in status_columns if col.show_by_default or show_all] + cluster_table = log_utils.create_table(columns) + + for cluster in clusters: + row = [] + for status_column in status_columns: + if status_column.show_by_default or show_all: + row.append(status_column.calc(cluster)) + cluster_table.add_row(row) + + if clusters: + click.echo(cluster_table) + else: + click.echo('No existing SkyPilot clusters on Kubernetes.') \ No newline at end of file From d56d22376ec27496d534475b65eaa780cb43cb56 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Mon, 7 Oct 2024 22:15:16 -0700 Subject: [PATCH 02/23] add parsing for jobs controllers --- sky/cli.py | 111 +++++++++++++++++++++++++++- sky/jobs/__init__.py | 2 + sky/jobs/core.py | 49 ++++++++++++ sky/jobs/utils.py | 24 +++++- sky/utils/cli_utils/status_utils.py | 60 +-------------- 5 files changed, 183 insertions(+), 63 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index e8d84226ef4..abe2712db61 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1458,6 +1458,94 @@ def _get_services(service_names: Optional[List[str]], return num_services, msg +def _process_skypilot_pods(pods: List[Any]) -> List[Dict[str, Any]]: + """Process SkyPilot pods and group them by cluster.""" + # pylint: disable=import-outside-toplevel + from sky import resources as resources_lib + from sky import clouds + clusters: Dict[str, Dict] = {} + jobs_controllers: Dict[str, Dict] = {} + serve_controllers: Dict[str, Dict] = {} + + for pod in pods: + cluster_name_on_cloud = pod.metadata.labels.get('skypilot-cluster') + + # Check if name is name of a controller + # Can't use controller_utils.Controllers.from_name(cluster_name) + # because hash is different across users + if 'controller' in cluster_name_on_cloud: + start_time = pod.status.start_time.timestamp() + controller_info = { + 'name': cluster_name_on_cloud, + 'user': pod.metadata.labels.get('skypilot-user'), + 'status': status_lib.ClusterStatus.UP, + # Assuming UP if pod exists + 'pods': [pod], + 'launched_at': start_time + } + if 'sky-jobs-controller' in cluster_name_on_cloud: + jobs_controllers[cluster_name_on_cloud] = controller_info + elif 'sky-serve-controller' in cluster_name_on_cloud: + serve_controllers[cluster_name_on_cloud] = controller_info + + if cluster_name_on_cloud not in clusters: + # Parse the earliest start time for the cluster + if pod.status.start_time is None: + start_time = 0 + else: + start_time = pod.status.start_time.timestamp() + + # Parse resources + cpu_request = kubernetes_utils.parse_cpu_or_gpu_resource( + pod.spec.containers[0].resources.requests.get('cpu', '0')) + memory_request = kubernetes_utils.parse_memory_resource( + pod.spec.containers[0].resources.requests.get('memory', + '0')) / 1073741824 # Convert to GiB + gpu_count = kubernetes_utils.parse_cpu_or_gpu_resource( + pod.spec.containers[0].resources.requests.get('nvidia.com/gpu', + '0')) + if gpu_count > 0: + # TODO: We should pass context here + context = None + label_formatter, _ = kubernetes_utils.detect_gpu_label_formatter( + context) + gpu_label = label_formatter.get_label_key() + # Get GPU name from pod node selector + if pod.spec.node_selector is not None: + gpu_name = label_formatter.get_accelerator_from_label_value( + pod.spec.node_selector.get(gpu_label)) + + resources = resources_lib.Resources( + cloud=clouds.Kubernetes(), + cpus=float(cpu_request), + memory=memory_request, + accelerators=( + f'{gpu_name}:{gpu_count}' if gpu_count > 0 else None) + ) + if pod.status.phase == 'Pending': + # If pod is pending, do not show it in the status + continue + + clusters[cluster_name_on_cloud] = { + 'name': cluster_name_on_cloud, + 'user': pod.metadata.labels.get('skypilot-user'), + 'status': status_lib.ClusterStatus.UP, + 'pods': [], + 'launched_at': start_time, + 'resources': resources, + 'resources_str': f'{len(pods)}x {resources}', # TODO: Fixme number of pods. + } + else: + # Update start_time if this pod started earlier + pod_start_time = pod.status.start_time + if pod_start_time: + pod_start_time = pod_start_time.replace(tzinfo=None) + if pod_start_time < clusters[cluster_name_on_cloud]['launched_at']: + clusters[cluster_name_on_cloud]['launched_at'] = pod_start_time + + clusters[cluster_name_on_cloud]['pods'].append(pod) + return list(clusters.values()), jobs_controllers, serve_controllers + @cli.command() @click.option('--all', '-a', @@ -1588,7 +1676,28 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, raise ValueError( f'Failed to get SkyPilot pods from Kubernetes: {str(e)}') - status_utils.show_kubernetes_status_table(pods, all) + clusters, jobs_controllers, serve_controllers = _process_skypilot_pods(pods) + status_utils.show_kubernetes_status_table(clusters, all) + + all_jobs = [] + + for job_controller_name, job_controller_info in jobs_controllers.items(): + user = job_controller_info['user'] + pod = job_controller_info['pods'][0] + with rich_utils.safe_status(f'[bold cyan]Checking for in-progress managed jobs for {user}[/]'): + jobs = managed_jobs.queue_kubernetes(pod.metadata.name, refresh=False) + # Add user field to jobs + for job in jobs: + job['user'] = user + all_jobs.extend(jobs) + if all_jobs: + click.echo( + f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'Managed jobs from {len(jobs_controllers)} users on Kubernetes' + f'{colorama.Style.RESET_ALL}') + msg = managed_jobs.format_job_table(all_jobs, + show_all=all) + click.echo(msg) return # Using a pool with 2 worker to run the managed job query and sky serve # service query in parallel to speed up. The pool provides a AsyncResult diff --git a/sky/jobs/__init__.py b/sky/jobs/__init__.py index 922bb613ff7..8dcc7a399fb 100644 --- a/sky/jobs/__init__.py +++ b/sky/jobs/__init__.py @@ -8,6 +8,7 @@ from sky.jobs.core import cancel from sky.jobs.core import launch from sky.jobs.core import queue +from sky.jobs.core import queue_kubernetes from sky.jobs.core import tail_logs from sky.jobs.recovery_strategy import DEFAULT_RECOVERY_STRATEGY from sky.jobs.recovery_strategy import RECOVERY_STRATEGIES @@ -34,6 +35,7 @@ 'cancel', 'launch', 'queue', + 'queue_kubernetes', 'tail_logs', # utils 'ManagedJobCodeGen', diff --git a/sky/jobs/core.py b/sky/jobs/core.py index c4f59f65eca..12d5f83817e 100644 --- a/sky/jobs/core.py +++ b/sky/jobs/core.py @@ -138,6 +138,55 @@ def launch( _disable_controller_check=True) +def queue_kubernetes(pod_name: str, refresh: bool, skip_finished: bool = False) -> List[Dict[str, Any]]: + """Gets the jobs queue from the kubernetes cluster by reconstructing a cluster handle and running the appropriate code on the head node. + + Args: + pod_name: + refresh: + skip_finished: + + Returns: + + """ + from sky.provision import common + from sky import provision as provision_lib + cluster_name = pod_name.strip('-head').rsplit('-', 1)[0] + + provider_config = {} # TODO: Specify context and namespace here. + instances = {pod_name: None} + cluster_info = common.ClusterInfo(provider_name='kubernetes', head_instance_id=pod_name, provider_config=provider_config, instances=instances) + managed_jobs_runner = provision_lib.get_command_runners('kubernetes', + cluster_info)[0] + + code = managed_job_utils.ManagedJobCodeGen.get_job_table() + returncode, job_table_payload, stderr = managed_jobs_runner.run( + code, + require_outputs=True, + separate_stderr=True, + stream_logs=False, + ) + try: + subprocess_utils.handle_returncode(returncode, + code, + 'Failed to fetch managed jobs', + job_table_payload + stderr, + stream_logs=False) + except exceptions.CommandError as e: + raise RuntimeError(str(e)) from e + + jobs = managed_job_utils.load_managed_job_queue(job_table_payload) + if skip_finished: + # Filter out the finished jobs. If a multi-task job is partially + # finished, we will include all its tasks. + non_finished_tasks = list( + filter(lambda job: not job['status'].is_terminal(), jobs)) + non_finished_job_ids = {job['job_id'] for job in non_finished_tasks} + jobs = list( + filter(lambda job: job['job_id'] in non_finished_job_ids, jobs)) + return jobs + + @usage_lib.entrypoint def queue(refresh: bool, skip_finished: bool = False) -> List[Dict[str, Any]]: # NOTE(dev): Keep the docstring consistent between the Python API and CLI. diff --git a/sky/jobs/utils.py b/sky/jobs/utils.py index 524a0cb0478..ae0cf4539d8 100644 --- a/sky/jobs/utils.py +++ b/sky/jobs/utils.py @@ -599,10 +599,20 @@ def format_job_table( a list of "rows" (each of which is a list of str). """ jobs = collections.defaultdict(list) + # Check if the tasks have user information. + tasks_have_user = any([task.get('user') for task in tasks]) + if max_jobs and tasks_have_user: + raise ValueError('max_jobs is not supported when tasks have user info.') + + def get_hash(task): + if tasks_have_user: + return (task['user'], task['job_id']) + return task['job_id'] + for task in tasks: # The tasks within the same job_id are already sorted # by the task_id. - jobs[task['job_id']].append(task) + jobs[get_hash(task)].append(task) jobs = dict(jobs) status_counts: Dict[str, int] = collections.defaultdict(int) @@ -622,6 +632,8 @@ def format_job_table( ] if show_all: columns += ['STARTED', 'CLUSTER', 'REGION', 'FAILURE'] + if tasks_have_user: + columns.insert(0, 'USER') job_table = log_utils.create_table(columns) status_counts: Dict[str, int] = collections.defaultdict(int) @@ -636,9 +648,9 @@ def format_job_table( for task in all_tasks: # The tasks within the same job_id are already sorted # by the task_id. - jobs[task['job_id']].append(task) + jobs[get_hash(task)].append(task) - for job_id, job_tasks in jobs.items(): + for job_hash, job_tasks in jobs.items(): if len(job_tasks) > 1: # Aggregate the tasks into a new row in the table. job_name = job_tasks[0]['job_name'] @@ -675,7 +687,7 @@ def format_job_table( status_str += f' (task: {current_task_id})' job_values = [ - job_id, + job_hash[1] if tasks_have_user else job_hash, # TODO: Clean this up '', job_name, '-', @@ -692,6 +704,8 @@ def format_job_table( '-', failure_reason if failure_reason is not None else '-', ]) + if tasks_have_user: + job_values.insert(0, job_tasks[0].get('user', '-')) job_table.add_row(job_values) for task in job_tasks: @@ -724,6 +738,8 @@ def format_job_table( task['failure_reason'] if task['failure_reason'] is not None else '-', ]) + if tasks_have_user: + values.insert(0, task.get('user', '-')) job_table.add_row(values) if len(job_tasks) > 1: diff --git a/sky/utils/cli_utils/status_utils.py b/sky/utils/cli_utils/status_utils.py index 9bb576620df..698a57cc5dd 100644 --- a/sky/utils/cli_utils/status_utils.py +++ b/sky/utils/cli_utils/status_utils.py @@ -319,66 +319,11 @@ def _get_estimated_cost_for_cost_report( return f'$ {cost:.2f}' -def process_skypilot_pods(pods: List[Any]) -> List[Dict[str, Any]]: - """Process SkyPilot pods and group them by cluster.""" - # pylint: disable=import-outside-toplevel - from sky import resources as resources_lib - clusters = {} - for pod in pods: - cluster_name = pod.metadata.labels.get('skypilot-cluster') - if cluster_name not in clusters: - # Parse the earliest start time for the cluster - start_time = pod.status.start_time - if start_time: - # Parse to unix timestamp and convert to int - start_time = start_time.timestamp() - - # Parse resources - cpu_request = kubernetes_utils.parse_cpu_or_gpu_resource(pod.spec.containers[0].resources.requests.get('cpu', '0')) - memory_request = kubernetes_utils.parse_memory_resource(pod.spec.containers[0].resources.requests.get('memory', '0')) / 1073741824 # Convert to GiB - gpu_count = kubernetes_utils.parse_cpu_or_gpu_resource(pod.spec.containers[0].resources.requests.get('nvidia.com/gpu', '0')) - if gpu_count > 0: - # TODO: We should pass context here - context = None - label_formatter, _ = kubernetes_utils.detect_gpu_label_formatter(context) - gpu_label = label_formatter.get_label_key() - # Get GPU name from pod node selector - if pod.spec.node_selector is not None: - gpu_name = label_formatter.get_accelerator_from_label_value( - pod.spec.node_selector.get(gpu_label)) - - resources = resources_lib.Resources( - cloud=clouds.Kubernetes(), - cpus=float(cpu_request), - memory=memory_request, - accelerators=(f'{gpu_name}:{gpu_count}' if gpu_count > 0 else None) - ) - - clusters[cluster_name] = { - 'name': cluster_name, - 'user': pod.metadata.labels.get('skypilot-user'), - 'status': status_lib.ClusterStatus.UP, # Assuming UP if pod exists - 'pods': [], - 'launched_at': start_time, - 'resources': resources, - 'resources_str': f'{len(pods)}x {resources}', - } - else: - # Update start_time if this pod started earlier - pod_start_time = pod.status.start_time - if pod_start_time: - pod_start_time = pod_start_time.replace(tzinfo=None) - if pod_start_time < clusters[cluster_name]['launched_at']: - clusters[cluster_name]['launched_at'] = pod_start_time - - clusters[cluster_name]['pods'].append(pod) - return list(clusters.values()) - -def show_kubernetes_status_table(pods: List[Any], show_all: bool) -> None: +def show_kubernetes_status_table(clusters: List[Any], show_all: bool) -> None: """Compute cluster table values and display for Kubernetes clusters.""" status_columns = [ - StatusColumn('NAME', lambda c: c['name']), StatusColumn('USER', lambda c: c['user']), + StatusColumn('NAME', lambda c: c['name']), StatusColumn('LAUNCHED', lambda c: log_utils.readable_time_duration(c['launched_at'])), StatusColumn('RESOURCES', lambda c: c['resources_str'], trunc_length=70 if not show_all else 0), # StatusColumn('PODS', lambda c: len(c['pods'])), @@ -386,7 +331,6 @@ def show_kubernetes_status_table(pods: List[Any], show_all: bool) -> None: # Add more columns as needed ] - clusters = process_skypilot_pods(pods) columns = [col.name for col in status_columns if col.show_by_default or show_all] cluster_table = log_utils.create_table(columns) From 96193390b0d32757aac53e4b974486dbe9d2a38b Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Tue, 8 Oct 2024 16:16:08 -0700 Subject: [PATCH 03/23] better parsing for jobs controllers --- sky/cli.py | 42 ++++++++++++++++++++++------- sky/utils/cli_utils/status_utils.py | 2 +- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index abe2712db61..727e3cf2b15 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1469,6 +1469,7 @@ def _process_skypilot_pods(pods: List[Any]) -> List[Dict[str, Any]]: for pod in pods: cluster_name_on_cloud = pod.metadata.labels.get('skypilot-cluster') + cluster_name = cluster_name_on_cloud.rsplit('-', 1)[0] # Remove the user hash to get cluster name (e.g., mycluster-2ea4) # Check if name is name of a controller # Can't use controller_utils.Controllers.from_name(cluster_name) @@ -1476,7 +1477,8 @@ def _process_skypilot_pods(pods: List[Any]) -> List[Dict[str, Any]]: if 'controller' in cluster_name_on_cloud: start_time = pod.status.start_time.timestamp() controller_info = { - 'name': cluster_name_on_cloud, + 'cluster_name_on_cloud': cluster_name_on_cloud, + 'cluster_name': cluster_name, 'user': pod.metadata.labels.get('skypilot-user'), 'status': status_lib.ClusterStatus.UP, # Assuming UP if pod exists @@ -1527,23 +1529,28 @@ def _process_skypilot_pods(pods: List[Any]) -> List[Dict[str, Any]]: continue clusters[cluster_name_on_cloud] = { - 'name': cluster_name_on_cloud, + 'cluster_name_on_cloud': cluster_name_on_cloud, + 'cluster_name': cluster_name, 'user': pod.metadata.labels.get('skypilot-user'), 'status': status_lib.ClusterStatus.UP, 'pods': [], 'launched_at': start_time, 'resources': resources, - 'resources_str': f'{len(pods)}x {resources}', # TODO: Fixme number of pods. } else: # Update start_time if this pod started earlier pod_start_time = pod.status.start_time if pod_start_time: - pod_start_time = pod_start_time.replace(tzinfo=None) + pod_start_time = pod_start_time.timestamp() if pod_start_time < clusters[cluster_name_on_cloud]['launched_at']: clusters[cluster_name_on_cloud]['launched_at'] = pod_start_time - clusters[cluster_name_on_cloud]['pods'].append(pod) + # Update resources_str in clusters: + for cluster_name, cluster in clusters.items(): + resources = cluster['resources'] + num_pods = len(cluster['pods']) + resources_str = f'{num_pods}x {resources}' + cluster['resources_str'] = resources_str return list(clusters.values()), jobs_controllers, serve_controllers @cli.command() @@ -1665,9 +1672,6 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, cluster statuses from the cloud providers. """ if kubernetes: - click.echo( - f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}SkyPilot Clusters on Kubernetes' - f'{colorama.Style.RESET_ALL}') try: pods = kubernetes_utils.get_skypilot_pods() @@ -1677,7 +1681,6 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, f'Failed to get SkyPilot pods from Kubernetes: {str(e)}') clusters, jobs_controllers, serve_controllers = _process_skypilot_pods(pods) - status_utils.show_kubernetes_status_table(clusters, all) all_jobs = [] @@ -1690,6 +1693,27 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, for job in jobs: job['user'] = user all_jobs.extend(jobs) + + # Reconcile cluster state between managed jobs and clusters + # We don't want to show SkyPilot clusters that are part of a managed job + # in the main cluster list. + # Since we do not have any identifiers for which clusters are part of a + # managed job or service, we construct a list of managed job cluster + # names from the list of managed jobs and use that to exclude clusters. + managed_job_cluster_names = set() + for job in all_jobs: + # Managed job cluster name is - + managed_cluster_name = f'{job["job_name"]}-{job["job_id"]}' + managed_job_cluster_names.add(managed_cluster_name) + + unmanaged_clusters = [c for c in clusters if c['cluster_name'] not in managed_job_cluster_names] + + # Display clusters and jobs. + click.echo( + f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}SkyPilot Clusters on Kubernetes' + f'{colorama.Style.RESET_ALL}') + status_utils.show_kubernetes_status_table(unmanaged_clusters, all) + if all_jobs: click.echo( f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' diff --git a/sky/utils/cli_utils/status_utils.py b/sky/utils/cli_utils/status_utils.py index 698a57cc5dd..070333a3bf2 100644 --- a/sky/utils/cli_utils/status_utils.py +++ b/sky/utils/cli_utils/status_utils.py @@ -323,7 +323,7 @@ def show_kubernetes_status_table(clusters: List[Any], show_all: bool) -> None: """Compute cluster table values and display for Kubernetes clusters.""" status_columns = [ StatusColumn('USER', lambda c: c['user']), - StatusColumn('NAME', lambda c: c['name']), + StatusColumn('NAME', lambda c: c['cluster_name']), StatusColumn('LAUNCHED', lambda c: log_utils.readable_time_duration(c['launched_at'])), StatusColumn('RESOURCES', lambda c: c['resources_str'], trunc_length=70 if not show_all else 0), # StatusColumn('PODS', lambda c: len(c['pods'])), From 74fd6f24d8091704d3c3746aca14996c7fba5896 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Tue, 8 Oct 2024 17:03:22 -0700 Subject: [PATCH 04/23] sorting --- sky/cli.py | 27 ++++++++++++++++----------- sky/jobs/core.py | 8 ++++++-- sky/utils/cli_utils/status_utils.py | 9 +++++++-- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 727e3cf2b15..942bf65b599 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1491,10 +1491,9 @@ def _process_skypilot_pods(pods: List[Any]) -> List[Dict[str, Any]]: serve_controllers[cluster_name_on_cloud] = controller_info if cluster_name_on_cloud not in clusters: - # Parse the earliest start time for the cluster - if pod.status.start_time is None: - start_time = 0 - else: + # Parse the start time for the cluster + start_time = pod.status.start_time + if start_time is not None: start_time = pod.status.start_time.timestamp() # Parse resources @@ -1502,7 +1501,7 @@ def _process_skypilot_pods(pods: List[Any]) -> List[Dict[str, Any]]: pod.spec.containers[0].resources.requests.get('cpu', '0')) memory_request = kubernetes_utils.parse_memory_resource( pod.spec.containers[0].resources.requests.get('memory', - '0')) / 1073741824 # Convert to GiB + '0'), unit='G') gpu_count = kubernetes_utils.parse_cpu_or_gpu_resource( pod.spec.containers[0].resources.requests.get('nvidia.com/gpu', '0')) @@ -1519,8 +1518,8 @@ def _process_skypilot_pods(pods: List[Any]) -> List[Dict[str, Any]]: resources = resources_lib.Resources( cloud=clouds.Kubernetes(), - cpus=float(cpu_request), - memory=memory_request, + cpus=int(cpu_request), + memory=int(memory_request), accelerators=( f'{gpu_name}:{gpu_count}' if gpu_count > 0 else None) ) @@ -1540,7 +1539,7 @@ def _process_skypilot_pods(pods: List[Any]) -> List[Dict[str, Any]]: else: # Update start_time if this pod started earlier pod_start_time = pod.status.start_time - if pod_start_time: + if pod_start_time is not None: pod_start_time = pod_start_time.timestamp() if pod_start_time < clusters[cluster_name_on_cloud]['launched_at']: clusters[cluster_name_on_cloud]['launched_at'] = pod_start_time @@ -1672,9 +1671,9 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, cluster statuses from the cloud providers. """ if kubernetes: - + context = kubernetes_utils.get_current_kube_config_context_name() try: - pods = kubernetes_utils.get_skypilot_pods() + pods = kubernetes_utils.get_skypilot_pods(context) except Exception as e: with ux_utils.print_exception_no_traceback(): raise ValueError( @@ -1708,9 +1707,15 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, unmanaged_clusters = [c for c in clusters if c['cluster_name'] not in managed_job_cluster_names] + + click.echo( + f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'Kubernetes cluster state (context: {context})' + f'{colorama.Style.RESET_ALL}') # Display clusters and jobs. click.echo( - f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}SkyPilot Clusters on Kubernetes' + f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'SkyPilot Clusters' f'{colorama.Style.RESET_ALL}') status_utils.show_kubernetes_status_table(unmanaged_clusters, all) diff --git a/sky/jobs/core.py b/sky/jobs/core.py index 12d5f83817e..8da235fd34e 100644 --- a/sky/jobs/core.py +++ b/sky/jobs/core.py @@ -138,12 +138,16 @@ def launch( _disable_controller_check=True) -def queue_kubernetes(pod_name: str, refresh: bool, skip_finished: bool = False) -> List[Dict[str, Any]]: +def queue_kubernetes(pod_name: str, + refresh: bool, + context: Optional[str] = None, + skip_finished: bool = False) -> List[Dict[str, Any]]: """Gets the jobs queue from the kubernetes cluster by reconstructing a cluster handle and running the appropriate code on the head node. Args: pod_name: refresh: + context: Kubernetes context to use skip_finished: Returns: @@ -153,7 +157,7 @@ def queue_kubernetes(pod_name: str, refresh: bool, skip_finished: bool = False) from sky import provision as provision_lib cluster_name = pod_name.strip('-head').rsplit('-', 1)[0] - provider_config = {} # TODO: Specify context and namespace here. + provider_config = {'context': context} instances = {pod_name: None} cluster_info = common.ClusterInfo(provider_name='kubernetes', head_instance_id=pod_name, provider_config=provider_config, instances=instances) managed_jobs_runner = provision_lib.get_command_runners('kubernetes', diff --git a/sky/utils/cli_utils/status_utils.py b/sky/utils/cli_utils/status_utils.py index 070333a3bf2..8419a70087e 100644 --- a/sky/utils/cli_utils/status_utils.py +++ b/sky/utils/cli_utils/status_utils.py @@ -319,6 +319,7 @@ def _get_estimated_cost_for_cost_report( return f'$ {cost:.2f}' + def show_kubernetes_status_table(clusters: List[Any], show_all: bool) -> None: """Compute cluster table values and display for Kubernetes clusters.""" status_columns = [ @@ -328,13 +329,17 @@ def show_kubernetes_status_table(clusters: List[Any], show_all: bool) -> None: StatusColumn('RESOURCES', lambda c: c['resources_str'], trunc_length=70 if not show_all else 0), # StatusColumn('PODS', lambda c: len(c['pods'])), StatusColumn('STATUS', lambda c: c['status'].colored_str()), - # Add more columns as needed + # TODO(romilb): We should consider adding POD_NAME field here when --all + # is passed to help users fetch pod name programmatically. ] columns = [col.name for col in status_columns if col.show_by_default or show_all] cluster_table = log_utils.create_table(columns) - for cluster in clusters: + # Sort table by user, then by cluster name + sorted_clusters = sorted(clusters, key=lambda c: (c['user'], c['cluster_name'])) + + for cluster in sorted_clusters: row = [] for status_column in status_columns: if status_column.show_by_default or show_all: From 175204703d572439d18b2501dd95756ac918bfa5 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Tue, 8 Oct 2024 17:18:17 -0700 Subject: [PATCH 05/23] linting --- sky/cli.py | 89 +++++++++++++++++------------ sky/jobs/core.py | 19 ++++-- sky/jobs/utils.py | 3 +- sky/provision/kubernetes/utils.py | 4 +- sky/utils/cli_utils/status_utils.py | 21 ++++--- 5 files changed, 84 insertions(+), 52 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 942bf65b599..3249d68c32a 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1458,18 +1458,23 @@ def _get_services(service_names: Optional[List[str]], return num_services, msg -def _process_skypilot_pods(pods: List[Any]) -> List[Dict[str, Any]]: +def _process_skypilot_pods( + pods: List[Any], + context: Optional[str] = None +) -> Tuple[List[Dict[Any, Any]], Dict[str, Any], Dict[str, Any]]: """Process SkyPilot pods and group them by cluster.""" # pylint: disable=import-outside-toplevel - from sky import resources as resources_lib from sky import clouds + from sky import resources as resources_lib clusters: Dict[str, Dict] = {} jobs_controllers: Dict[str, Dict] = {} serve_controllers: Dict[str, Dict] = {} for pod in pods: cluster_name_on_cloud = pod.metadata.labels.get('skypilot-cluster') - cluster_name = cluster_name_on_cloud.rsplit('-', 1)[0] # Remove the user hash to get cluster name (e.g., mycluster-2ea4) + cluster_name = cluster_name_on_cloud.rsplit( + '-', 1 + )[0] # Remove the user hash to get cluster name (e.g., mycluster-2ea4) # Check if name is name of a controller # Can't use controller_utils.Controllers.from_name(cluster_name) @@ -1500,16 +1505,17 @@ def _process_skypilot_pods(pods: List[Any]) -> List[Dict[str, Any]]: cpu_request = kubernetes_utils.parse_cpu_or_gpu_resource( pod.spec.containers[0].resources.requests.get('cpu', '0')) memory_request = kubernetes_utils.parse_memory_resource( - pod.spec.containers[0].resources.requests.get('memory', - '0'), unit='G') + pod.spec.containers[0].resources.requests.get('memory', '0'), + unit='G') gpu_count = kubernetes_utils.parse_cpu_or_gpu_resource( - pod.spec.containers[0].resources.requests.get('nvidia.com/gpu', - '0')) + pod.spec.containers[0].resources.requests.get( + 'nvidia.com/gpu', '0')) if gpu_count > 0: - # TODO: We should pass context here - context = None label_formatter, _ = kubernetes_utils.detect_gpu_label_formatter( context) + assert label_formatter is not None, ( + 'GPU label formatter cannot be None if there are pods ' + f'requesting GPUs: {pod.metadata.name}') gpu_label = label_formatter.get_label_key() # Get GPU name from pod node selector if pod.spec.node_selector is not None: @@ -1520,9 +1526,8 @@ def _process_skypilot_pods(pods: List[Any]) -> List[Dict[str, Any]]: cloud=clouds.Kubernetes(), cpus=int(cpu_request), memory=int(memory_request), - accelerators=( - f'{gpu_name}:{gpu_count}' if gpu_count > 0 else None) - ) + accelerators=(f'{gpu_name}:{gpu_count}' + if gpu_count > 0 else None)) if pod.status.phase == 'Pending': # If pod is pending, do not show it in the status continue @@ -1541,8 +1546,10 @@ def _process_skypilot_pods(pods: List[Any]) -> List[Dict[str, Any]]: pod_start_time = pod.status.start_time if pod_start_time is not None: pod_start_time = pod_start_time.timestamp() - if pod_start_time < clusters[cluster_name_on_cloud]['launched_at']: - clusters[cluster_name_on_cloud]['launched_at'] = pod_start_time + if pod_start_time < clusters[cluster_name_on_cloud][ + 'launched_at']: + clusters[cluster_name_on_cloud][ + 'launched_at'] = pod_start_time clusters[cluster_name_on_cloud]['pods'].append(pod) # Update resources_str in clusters: for cluster_name, cluster in clusters.items(): @@ -1552,6 +1559,7 @@ def _process_skypilot_pods(pods: List[Any]) -> List[Dict[str, Any]]: cluster['resources_str'] = resources_str return list(clusters.values()), jobs_controllers, serve_controllers + @cli.command() @click.option('--all', '-a', @@ -1597,11 +1605,12 @@ def _process_skypilot_pods(pods: List[Any]) -> List[Dict[str, Any]]: is_flag=True, required=False, help='Also show sky serve services, if any.') -@click.option('--kubernetes', - default=False, - is_flag=True, - required=False, - help='[Experimental] Show SkyPilot clusters from all users on Kubernetes.') +@click.option( + '--kubernetes', + default=False, + is_flag=True, + required=False, + help='[Experimental] Show SkyPilot clusters from all users on Kubernetes.') @click.argument('clusters', required=False, type=str, @@ -1679,15 +1688,20 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, raise ValueError( f'Failed to get SkyPilot pods from Kubernetes: {str(e)}') - clusters, jobs_controllers, serve_controllers = _process_skypilot_pods(pods) + clusters, jobs_controllers, serve_controllers = _process_skypilot_pods( + pods, context) all_jobs = [] - for job_controller_name, job_controller_info in jobs_controllers.items(): + for job_controller_name, job_controller_info in jobs_controllers.items( + ): user = job_controller_info['user'] pod = job_controller_info['pods'][0] - with rich_utils.safe_status(f'[bold cyan]Checking for in-progress managed jobs for {user}[/]'): - jobs = managed_jobs.queue_kubernetes(pod.metadata.name, refresh=False) + with rich_utils.safe_status( + f'[bold cyan]Checking for in-progress managed jobs for {user}[/]' + ): + jobs = managed_jobs.queue_kubernetes(pod.metadata.name, + refresh=False) # Add user field to jobs for job in jobs: job['user'] = user @@ -1705,27 +1719,26 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, managed_cluster_name = f'{job["job_name"]}-{job["job_id"]}' managed_job_cluster_names.add(managed_cluster_name) - unmanaged_clusters = [c for c in clusters if c['cluster_name'] not in managed_job_cluster_names] - + unmanaged_clusters = [ + c for c in clusters + if c['cluster_name'] not in managed_job_cluster_names + ] - click.echo( - f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' - f'Kubernetes cluster state (context: {context})' - f'{colorama.Style.RESET_ALL}') + click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'Kubernetes cluster state (context: {context})' + f'{colorama.Style.RESET_ALL}') # Display clusters and jobs. - click.echo( - f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' - f'SkyPilot Clusters' - f'{colorama.Style.RESET_ALL}') + click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'SkyPilot Clusters' + f'{colorama.Style.RESET_ALL}') status_utils.show_kubernetes_status_table(unmanaged_clusters, all) if all_jobs: click.echo( - f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' - f'Managed jobs from {len(jobs_controllers)} users on Kubernetes' - f'{colorama.Style.RESET_ALL}') - msg = managed_jobs.format_job_table(all_jobs, - show_all=all) + f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'Managed jobs from {len(jobs_controllers)} users on Kubernetes' + f'{colorama.Style.RESET_ALL}') + msg = managed_jobs.format_job_table(all_jobs, show_all=all) click.echo(msg) return # Using a pool with 2 worker to run the managed job query and sky serve diff --git a/sky/jobs/core.py b/sky/jobs/core.py index 8da235fd34e..5de5c90502e 100644 --- a/sky/jobs/core.py +++ b/sky/jobs/core.py @@ -153,15 +153,24 @@ def queue_kubernetes(pod_name: str, Returns: """ - from sky.provision import common from sky import provision as provision_lib + from sky.provision import common cluster_name = pod_name.strip('-head').rsplit('-', 1)[0] + # Create dummy cluster info to get the command runner. provider_config = {'context': context} - instances = {pod_name: None} - cluster_info = common.ClusterInfo(provider_name='kubernetes', head_instance_id=pod_name, provider_config=provider_config, instances=instances) - managed_jobs_runner = provision_lib.get_command_runners('kubernetes', - cluster_info)[0] + instances = { + pod_name: common.InstanceInfo(instance_id=pod_name, + internal_ip='', + external_ip='', + tags={}) + } # Internal IP is not required for Kubernetes + cluster_info = common.ClusterInfo(provider_name='kubernetes', + head_instance_id=pod_name, + provider_config=provider_config, + instances=instances) + managed_jobs_runner = provision_lib.get_command_runners( + 'kubernetes', cluster_info)[0] code = managed_job_utils.ManagedJobCodeGen.get_job_table() returncode, job_table_payload, stderr = managed_jobs_runner.run( diff --git a/sky/jobs/utils.py b/sky/jobs/utils.py index ae0cf4539d8..98b208f47c8 100644 --- a/sky/jobs/utils.py +++ b/sky/jobs/utils.py @@ -687,7 +687,8 @@ def get_hash(task): status_str += f' (task: {current_task_id})' job_values = [ - job_hash[1] if tasks_have_user else job_hash, # TODO: Clean this up + job_hash[1] + if tasks_have_user else job_hash, # TODO: Clean this up '', job_name, '-', diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index bf22bdee419..0e19718db24 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -1999,6 +1999,7 @@ def get_context_from_config(provider_config: Dict[str, Any]) -> Optional[str]: context = None return context + def get_skypilot_pods(context: Optional[str] = None) -> List[Any]: """Gets all SkyPilot pods in the Kubernetes cluster. @@ -2019,5 +2020,6 @@ def get_skypilot_pods(context: Optional[str] = None) -> List[Any]: raise exceptions.ResourcesUnavailableError( 'Timed out when trying to get SkyPilot pod info from Kubernetes cluster. ' 'Please check if the cluster is healthy and retry. To debug, run: ' - 'kubectl get pods --selector=skypilot-cluster --all-namespaces') from None + 'kubectl get pods --selector=skypilot-cluster --all-namespaces' + ) from None return pods diff --git a/sky/utils/cli_utils/status_utils.py b/sky/utils/cli_utils/status_utils.py index 8419a70087e..482a12e53f3 100644 --- a/sky/utils/cli_utils/status_utils.py +++ b/sky/utils/cli_utils/status_utils.py @@ -5,12 +5,12 @@ import colorama from sky import backends -from sky import status_lib from sky import clouds +from sky import status_lib +from sky.provision.kubernetes import utils as kubernetes_utils from sky.skylet import constants from sky.utils import log_utils from sky.utils import resources_utils -from sky.provision.kubernetes import utils as kubernetes_utils COMMAND_TRUNC_LENGTH = 25 NUM_COST_REPORT_LINES = 5 @@ -325,19 +325,26 @@ def show_kubernetes_status_table(clusters: List[Any], show_all: bool) -> None: status_columns = [ StatusColumn('USER', lambda c: c['user']), StatusColumn('NAME', lambda c: c['cluster_name']), - StatusColumn('LAUNCHED', lambda c: log_utils.readable_time_duration(c['launched_at'])), - StatusColumn('RESOURCES', lambda c: c['resources_str'], trunc_length=70 if not show_all else 0), + StatusColumn( + 'LAUNCHED', + lambda c: log_utils.readable_time_duration(c['launched_at'])), + StatusColumn('RESOURCES', + lambda c: c['resources_str'], + trunc_length=70 if not show_all else 0), # StatusColumn('PODS', lambda c: len(c['pods'])), StatusColumn('STATUS', lambda c: c['status'].colored_str()), # TODO(romilb): We should consider adding POD_NAME field here when --all # is passed to help users fetch pod name programmatically. ] - columns = [col.name for col in status_columns if col.show_by_default or show_all] + columns = [ + col.name for col in status_columns if col.show_by_default or show_all + ] cluster_table = log_utils.create_table(columns) # Sort table by user, then by cluster name - sorted_clusters = sorted(clusters, key=lambda c: (c['user'], c['cluster_name'])) + sorted_clusters = sorted(clusters, + key=lambda c: (c['user'], c['cluster_name'])) for cluster in sorted_clusters: row = [] @@ -349,4 +356,4 @@ def show_kubernetes_status_table(clusters: List[Any], show_all: bool) -> None: if clusters: click.echo(cluster_table) else: - click.echo('No existing SkyPilot clusters on Kubernetes.') \ No newline at end of file + click.echo('No existing SkyPilot clusters on Kubernetes.') From 8d098cca3504a6a2b06a0625338c280b6c2fcad2 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Tue, 8 Oct 2024 18:34:58 -0700 Subject: [PATCH 06/23] wip --- sky/cli.py | 4 ++-- sky/jobs/core.py | 10 ++++++---- sky/jobs/utils.py | 1 - 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 3249d68c32a..cbfeaeb391c 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1688,7 +1688,7 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, raise ValueError( f'Failed to get SkyPilot pods from Kubernetes: {str(e)}') - clusters, jobs_controllers, serve_controllers = _process_skypilot_pods( + all_clusters, jobs_controllers, serve_controllers = _process_skypilot_pods( pods, context) all_jobs = [] @@ -1720,7 +1720,7 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, managed_job_cluster_names.add(managed_cluster_name) unmanaged_clusters = [ - c for c in clusters + c for c in all_clusters if c['cluster_name'] not in managed_job_cluster_names ] diff --git a/sky/jobs/core.py b/sky/jobs/core.py index 5de5c90502e..2aaf5c9c129 100644 --- a/sky/jobs/core.py +++ b/sky/jobs/core.py @@ -160,10 +160,12 @@ def queue_kubernetes(pod_name: str, # Create dummy cluster info to get the command runner. provider_config = {'context': context} instances = { - pod_name: common.InstanceInfo(instance_id=pod_name, - internal_ip='', - external_ip='', - tags={}) + pod_name: [ + common.InstanceInfo(instance_id=pod_name, + internal_ip='', + external_ip='', + tags={}) + ] } # Internal IP is not required for Kubernetes cluster_info = common.ClusterInfo(provider_name='kubernetes', head_instance_id=pod_name, diff --git a/sky/jobs/utils.py b/sky/jobs/utils.py index 98b208f47c8..0f91064cd95 100644 --- a/sky/jobs/utils.py +++ b/sky/jobs/utils.py @@ -613,7 +613,6 @@ def get_hash(task): # The tasks within the same job_id are already sorted # by the task_id. jobs[get_hash(task)].append(task) - jobs = dict(jobs) status_counts: Dict[str, int] = collections.defaultdict(int) for job_tasks in jobs.values(): From dc1f46eba2b71a3233701535a9005bb18194a5e3 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Tue, 8 Oct 2024 18:50:12 -0700 Subject: [PATCH 07/23] lint --- sky/cli.py | 35 +++++++++++++---------------- sky/jobs/core.py | 9 +++----- sky/jobs/utils.py | 5 ----- sky/provision/kubernetes/utils.py | 4 ++-- sky/utils/cli_utils/status_utils.py | 2 -- 5 files changed, 20 insertions(+), 35 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index cbfeaeb391c..b12a2eb90f1 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -53,6 +53,7 @@ from sky import exceptions from sky import global_user_state from sky import jobs as managed_jobs +from sky import resources as resources_lib from sky import serve as serve_lib from sky import sky_logging from sky import status_lib @@ -1463,9 +1464,6 @@ def _process_skypilot_pods( context: Optional[str] = None ) -> Tuple[List[Dict[Any, Any]], Dict[str, Any], Dict[str, Any]]: """Process SkyPilot pods and group them by cluster.""" - # pylint: disable=import-outside-toplevel - from sky import clouds - from sky import resources as resources_lib clusters: Dict[str, Dict] = {} jobs_controllers: Dict[str, Dict] = {} serve_controllers: Dict[str, Dict] = {} @@ -1511,8 +1509,8 @@ def _process_skypilot_pods( pod.spec.containers[0].resources.requests.get( 'nvidia.com/gpu', '0')) if gpu_count > 0: - label_formatter, _ = kubernetes_utils.detect_gpu_label_formatter( - context) + label_formatter, _ = ( + kubernetes_utils.detect_gpu_label_formatter(context)) assert label_formatter is not None, ( 'GPU label formatter cannot be None if there are pods ' f'requesting GPUs: {pod.metadata.name}') @@ -1523,7 +1521,7 @@ def _process_skypilot_pods( pod.spec.node_selector.get(gpu_label)) resources = resources_lib.Resources( - cloud=clouds.Kubernetes(), + cloud=sky_clouds.Kubernetes(), cpus=int(cpu_request), memory=int(memory_request), accelerators=(f'{gpu_name}:{gpu_count}' @@ -1683,29 +1681,26 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, context = kubernetes_utils.get_current_kube_config_context_name() try: pods = kubernetes_utils.get_skypilot_pods(context) - except Exception as e: + except exceptions.ResourcesUnavailableError as e: with ux_utils.print_exception_no_traceback(): - raise ValueError( - f'Failed to get SkyPilot pods from Kubernetes: {str(e)}') + raise ValueError('Failed to get SkyPilot pods from ' + f'Kubernetes: {str(e)}') from e - all_clusters, jobs_controllers, serve_controllers = _process_skypilot_pods( - pods, context) + all_clusters, jobs_controllers, _ = (_process_skypilot_pods( + pods, context)) all_jobs = [] - for job_controller_name, job_controller_info in jobs_controllers.items( - ): + for _, job_controller_info in jobs_controllers.items(): user = job_controller_info['user'] pod = job_controller_info['pods'][0] - with rich_utils.safe_status( - f'[bold cyan]Checking for in-progress managed jobs for {user}[/]' - ): - jobs = managed_jobs.queue_kubernetes(pod.metadata.name, - refresh=False) + with rich_utils.safe_status('[bold cyan]Checking for in-progress ' + f'managed jobs for {user}[/]'): + job_list = managed_jobs.queue_kubernetes(pod.metadata.name) # Add user field to jobs - for job in jobs: + for job in job_list: job['user'] = user - all_jobs.extend(jobs) + all_jobs.extend(job_list) # Reconcile cluster state between managed jobs and clusters # We don't want to show SkyPilot clusters that are part of a managed job diff --git a/sky/jobs/core.py b/sky/jobs/core.py index 2aaf5c9c129..e5e2fed5489 100644 --- a/sky/jobs/core.py +++ b/sky/jobs/core.py @@ -9,6 +9,7 @@ import sky from sky import backends from sky import exceptions +from sky import provision as provision_lib from sky import sky_logging from sky import status_lib from sky import task as task_lib @@ -16,6 +17,7 @@ from sky.clouds.service_catalog import common as service_catalog_common from sky.jobs import constants as managed_job_constants from sky.jobs import utils as managed_job_utils +from sky.provision import common from sky.skylet import constants as skylet_constants from sky.usage import usage_lib from sky.utils import admin_policy_utils @@ -139,10 +141,9 @@ def launch( def queue_kubernetes(pod_name: str, - refresh: bool, context: Optional[str] = None, skip_finished: bool = False) -> List[Dict[str, Any]]: - """Gets the jobs queue from the kubernetes cluster by reconstructing a cluster handle and running the appropriate code on the head node. + """Gets the jobs queue from the kubernetes cluster. Args: pod_name: @@ -153,10 +154,6 @@ def queue_kubernetes(pod_name: str, Returns: """ - from sky import provision as provision_lib - from sky.provision import common - cluster_name = pod_name.strip('-head').rsplit('-', 1)[0] - # Create dummy cluster info to get the command runner. provider_config = {'context': context} instances = { diff --git a/sky/jobs/utils.py b/sky/jobs/utils.py index 0f91064cd95..2716d2a76ef 100644 --- a/sky/jobs/utils.py +++ b/sky/jobs/utils.py @@ -620,11 +620,6 @@ def get_hash(task): if not managed_job_status.is_terminal(): status_counts[managed_job_status.value] += 1 - if max_jobs is not None: - job_ids = sorted(jobs.keys(), reverse=True) - job_ids = job_ids[:max_jobs] - jobs = {job_id: jobs[job_id] for job_id in job_ids} - columns = [ 'ID', 'TASK', 'NAME', 'RESOURCES', 'SUBMITTED', 'TOT. DURATION', 'JOB DURATION', '#RECOVERIES', 'STATUS' diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index 0e19718db24..3924074838e 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -2004,7 +2004,7 @@ def get_skypilot_pods(context: Optional[str] = None) -> List[Any]: """Gets all SkyPilot pods in the Kubernetes cluster. Args: - context: The Kubernetes context to use. If None, uses the current context. + context: Kubernetes context to use. If None, uses the current context. Returns: A list of Kubernetes pod objects. @@ -2018,7 +2018,7 @@ def get_skypilot_pods(context: Optional[str] = None) -> List[Any]: _request_timeout=kubernetes.API_TIMEOUT).items except kubernetes.max_retry_error(): raise exceptions.ResourcesUnavailableError( - 'Timed out when trying to get SkyPilot pod info from Kubernetes cluster. ' + 'Timed out trying to get SkyPilot pods from Kubernetes cluster. ' 'Please check if the cluster is healthy and retry. To debug, run: ' 'kubectl get pods --selector=skypilot-cluster --all-namespaces' ) from None diff --git a/sky/utils/cli_utils/status_utils.py b/sky/utils/cli_utils/status_utils.py index 482a12e53f3..934ea6ee78b 100644 --- a/sky/utils/cli_utils/status_utils.py +++ b/sky/utils/cli_utils/status_utils.py @@ -5,9 +5,7 @@ import colorama from sky import backends -from sky import clouds from sky import status_lib -from sky.provision.kubernetes import utils as kubernetes_utils from sky.skylet import constants from sky.utils import log_utils from sky.utils import resources_utils From 686595069aa21dcd90f3cd4b0db41009a1a6c6ed Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Tue, 8 Oct 2024 18:55:29 -0700 Subject: [PATCH 08/23] cleanup --- sky/jobs/utils.py | 4 ++-- sky/utils/cli_utils/status_utils.py | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sky/jobs/utils.py b/sky/jobs/utils.py index 2716d2a76ef..d46404bd4fd 100644 --- a/sky/jobs/utils.py +++ b/sky/jobs/utils.py @@ -680,9 +680,9 @@ def get_hash(task): if not managed_job_status.is_terminal(): status_str += f' (task: {current_task_id})' + job_id = job_hash[1] if tasks_have_user else job_hash job_values = [ - job_hash[1] - if tasks_have_user else job_hash, # TODO: Clean this up + job_id, '', job_name, '-', diff --git a/sky/utils/cli_utils/status_utils.py b/sky/utils/cli_utils/status_utils.py index 934ea6ee78b..b3392443d74 100644 --- a/sky/utils/cli_utils/status_utils.py +++ b/sky/utils/cli_utils/status_utils.py @@ -329,7 +329,6 @@ def show_kubernetes_status_table(clusters: List[Any], show_all: bool) -> None: StatusColumn('RESOURCES', lambda c: c['resources_str'], trunc_length=70 if not show_all else 0), - # StatusColumn('PODS', lambda c: len(c['pods'])), StatusColumn('STATUS', lambda c: c['status'].colored_str()), # TODO(romilb): We should consider adding POD_NAME field here when --all # is passed to help users fetch pod name programmatically. From 6856ca10d7c19d52e4ca333cbd7b9b6cb2e71bf5 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Tue, 8 Oct 2024 19:07:29 -0700 Subject: [PATCH 09/23] comment cleanup --- sky/cli.py | 26 +++++++++++++++----------- sky/utils/cli_utils/status_utils.py | 3 ++- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index b12a2eb90f1..93c19b14df7 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1694,7 +1694,7 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, for _, job_controller_info in jobs_controllers.items(): user = job_controller_info['user'] pod = job_controller_info['pods'][0] - with rich_utils.safe_status('[bold cyan]Checking for in-progress ' + with rich_utils.safe_status('[bold cyan]Checking in-progress ' f'managed jobs for {user}[/]'): job_list = managed_jobs.queue_kubernetes(pod.metadata.name) # Add user field to jobs @@ -1702,12 +1702,16 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, job['user'] = user all_jobs.extend(job_list) - # Reconcile cluster state between managed jobs and clusters - # We don't want to show SkyPilot clusters that are part of a managed job - # in the main cluster list. - # Since we do not have any identifiers for which clusters are part of a - # managed job or service, we construct a list of managed job cluster - # names from the list of managed jobs and use that to exclude clusters. + # Reconcile cluster state between managed jobs and clusters: + # To maintain a clear separation between regular SkyPilot clusters + # and those from managed jobs, we need to exclude the latter from + # the main cluster list. + # We do this by reconstructing managed job cluster names from each + # job's name and ID. We then use this set to filter out managed + # clusters from the main cluster list. This is necessary because there + # are no identifiers distinguishing clusters from managed jobs from + # regular clusters. + managed_job_cluster_names = set() for job in all_jobs: # Managed job cluster name is - @@ -1722,16 +1726,16 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' f'Kubernetes cluster state (context: {context})' f'{colorama.Style.RESET_ALL}') - # Display clusters and jobs. click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' - f'SkyPilot Clusters' + f'SkyPilot clusters' f'{colorama.Style.RESET_ALL}') - status_utils.show_kubernetes_status_table(unmanaged_clusters, all) + status_utils.show_kubernetes_cluster_status_table( + unmanaged_clusters, all) if all_jobs: click.echo( f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' - f'Managed jobs from {len(jobs_controllers)} users on Kubernetes' + f'Managed jobs from {len(jobs_controllers)} users' f'{colorama.Style.RESET_ALL}') msg = managed_jobs.format_job_table(all_jobs, show_all=all) click.echo(msg) diff --git a/sky/utils/cli_utils/status_utils.py b/sky/utils/cli_utils/status_utils.py index b3392443d74..edf5c9120b2 100644 --- a/sky/utils/cli_utils/status_utils.py +++ b/sky/utils/cli_utils/status_utils.py @@ -318,7 +318,8 @@ def _get_estimated_cost_for_cost_report( return f'$ {cost:.2f}' -def show_kubernetes_status_table(clusters: List[Any], show_all: bool) -> None: +def show_kubernetes_cluster_status_table(clusters: List[Any], + show_all: bool) -> None: """Compute cluster table values and display for Kubernetes clusters.""" status_columns = [ StatusColumn('USER', lambda c: c['user']), From 15fb4c527581539a629767905241eded35a192d4 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Tue, 8 Oct 2024 19:09:47 -0700 Subject: [PATCH 10/23] comment cleanup --- sky/cli.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sky/cli.py b/sky/cli.py index 93c19b14df7..626d4cfb6f5 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1463,7 +1463,6 @@ def _process_skypilot_pods( pods: List[Any], context: Optional[str] = None ) -> Tuple[List[Dict[Any, Any]], Dict[str, Any], Dict[str, Any]]: - """Process SkyPilot pods and group them by cluster.""" clusters: Dict[str, Dict] = {} jobs_controllers: Dict[str, Dict] = {} serve_controllers: Dict[str, Dict] = {} From d90939408a6d9cc4e4ca82cefab45d1f0a621138 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Tue, 8 Oct 2024 19:10:15 -0700 Subject: [PATCH 11/23] comment cleanup --- sky/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/cli.py b/sky/cli.py index 626d4cfb6f5..ffe09754de8 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1473,7 +1473,7 @@ def _process_skypilot_pods( '-', 1 )[0] # Remove the user hash to get cluster name (e.g., mycluster-2ea4) - # Check if name is name of a controller + # Check if cluster name is name of a controller # Can't use controller_utils.Controllers.from_name(cluster_name) # because hash is different across users if 'controller' in cluster_name_on_cloud: From 5b0605a36f41304f42e6b7582e483d11b94bca73 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Tue, 8 Oct 2024 19:12:36 -0700 Subject: [PATCH 12/23] update docst --- sky/jobs/core.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/sky/jobs/core.py b/sky/jobs/core.py index e5e2fed5489..edf0e58bcba 100644 --- a/sky/jobs/core.py +++ b/sky/jobs/core.py @@ -143,16 +143,31 @@ def launch( def queue_kubernetes(pod_name: str, context: Optional[str] = None, skip_finished: bool = False) -> List[Dict[str, Any]]: - """Gets the jobs queue from the kubernetes cluster. + """Gets the jobs queue from a specific controller pod. Args: - pod_name: - refresh: - context: Kubernetes context to use - skip_finished: + pod_name (str): The name of the controller pod to query for jobs. + context (Optional[str]): The Kubernetes context to use. If None, the current context is used. + skip_finished (bool): If True, exclude finished jobs from the returned list. Returns: + [ + { + 'job_id': int, + 'job_name': str, + 'resources': str, + 'submitted_at': (float) timestamp of submission, + 'end_at': (float) timestamp of end, + 'duration': (float) duration in seconds, + 'recovery_count': (int) Number of retries, + 'status': (sky.jobs.ManagedJobStatus) of the job, + 'cluster_resources': (str) resources of the cluster, + 'region': (str) region of the cluster, + } + ] + Raises: + RuntimeError: If there's an error fetching the managed jobs from the Kubernetes cluster. """ # Create dummy cluster info to get the command runner. provider_config = {'context': context} From 5b8e0e42b1debe7a57d1bc56723fd1135d405739 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Tue, 8 Oct 2024 19:13:39 -0700 Subject: [PATCH 13/23] update docstr --- sky/cli.py | 7 +++---- sky/jobs/core.py | 5 +++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index ffe09754de8..08d675b9590 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1732,10 +1732,9 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, unmanaged_clusters, all) if all_jobs: - click.echo( - f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' - f'Managed jobs from {len(jobs_controllers)} users' - f'{colorama.Style.RESET_ALL}') + click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'Managed jobs from {len(jobs_controllers)} users' + f'{colorama.Style.RESET_ALL}') msg = managed_jobs.format_job_table(all_jobs, show_all=all) click.echo(msg) return diff --git a/sky/jobs/core.py b/sky/jobs/core.py index edf0e58bcba..07dc2b9e129 100644 --- a/sky/jobs/core.py +++ b/sky/jobs/core.py @@ -147,8 +147,9 @@ def queue_kubernetes(pod_name: str, Args: pod_name (str): The name of the controller pod to query for jobs. - context (Optional[str]): The Kubernetes context to use. If None, the current context is used. - skip_finished (bool): If True, exclude finished jobs from the returned list. + context (Optional[str]): The Kubernetes context to use. If None, the + current context is used. + skip_finished (bool): If True, does not return finished jobs. Returns: [ From 7d8ab11c8c7a64d47a8af1743da6c14f95f5e6a6 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Tue, 8 Oct 2024 23:34:50 -0700 Subject: [PATCH 14/23] update docstr --- sky/jobs/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/jobs/core.py b/sky/jobs/core.py index 07dc2b9e129..a92aee74d58 100644 --- a/sky/jobs/core.py +++ b/sky/jobs/core.py @@ -168,7 +168,7 @@ def queue_kubernetes(pod_name: str, ] Raises: - RuntimeError: If there's an error fetching the managed jobs from the Kubernetes cluster. + RuntimeError: If there's an error fetching the managed jobs. """ # Create dummy cluster info to get the command runner. provider_config = {'context': context} From c3e773f7cd487fc1d19a324aad16182bd68b7edc Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Wed, 9 Oct 2024 15:07:31 -0700 Subject: [PATCH 15/23] refactor to avoid cyclic import --- sky/cli.py | 215 ++++++++-------------------- sky/data/storage_utils.py | 7 +- sky/utils/cli_utils/status_utils.py | 127 +++++++++++++--- sky/utils/common_utils.py | 19 +++ 4 files changed, 187 insertions(+), 181 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 08d675b9590..051b14005de 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1459,102 +1459,62 @@ def _get_services(service_names: Optional[List[str]], return num_services, msg -def _process_skypilot_pods( - pods: List[Any], - context: Optional[str] = None -) -> Tuple[List[Dict[Any, Any]], Dict[str, Any], Dict[str, Any]]: - clusters: Dict[str, Dict] = {} - jobs_controllers: Dict[str, Dict] = {} - serve_controllers: Dict[str, Dict] = {} - - for pod in pods: - cluster_name_on_cloud = pod.metadata.labels.get('skypilot-cluster') - cluster_name = cluster_name_on_cloud.rsplit( - '-', 1 - )[0] # Remove the user hash to get cluster name (e.g., mycluster-2ea4) - - # Check if cluster name is name of a controller - # Can't use controller_utils.Controllers.from_name(cluster_name) - # because hash is different across users - if 'controller' in cluster_name_on_cloud: - start_time = pod.status.start_time.timestamp() - controller_info = { - 'cluster_name_on_cloud': cluster_name_on_cloud, - 'cluster_name': cluster_name, - 'user': pod.metadata.labels.get('skypilot-user'), - 'status': status_lib.ClusterStatus.UP, - # Assuming UP if pod exists - 'pods': [pod], - 'launched_at': start_time - } - if 'sky-jobs-controller' in cluster_name_on_cloud: - jobs_controllers[cluster_name_on_cloud] = controller_info - elif 'sky-serve-controller' in cluster_name_on_cloud: - serve_controllers[cluster_name_on_cloud] = controller_info - - if cluster_name_on_cloud not in clusters: - # Parse the start time for the cluster - start_time = pod.status.start_time - if start_time is not None: - start_time = pod.status.start_time.timestamp() - - # Parse resources - cpu_request = kubernetes_utils.parse_cpu_or_gpu_resource( - pod.spec.containers[0].resources.requests.get('cpu', '0')) - memory_request = kubernetes_utils.parse_memory_resource( - pod.spec.containers[0].resources.requests.get('memory', '0'), - unit='G') - gpu_count = kubernetes_utils.parse_cpu_or_gpu_resource( - pod.spec.containers[0].resources.requests.get( - 'nvidia.com/gpu', '0')) - if gpu_count > 0: - label_formatter, _ = ( - kubernetes_utils.detect_gpu_label_formatter(context)) - assert label_formatter is not None, ( - 'GPU label formatter cannot be None if there are pods ' - f'requesting GPUs: {pod.metadata.name}') - gpu_label = label_formatter.get_label_key() - # Get GPU name from pod node selector - if pod.spec.node_selector is not None: - gpu_name = label_formatter.get_accelerator_from_label_value( - pod.spec.node_selector.get(gpu_label)) - - resources = resources_lib.Resources( - cloud=sky_clouds.Kubernetes(), - cpus=int(cpu_request), - memory=int(memory_request), - accelerators=(f'{gpu_name}:{gpu_count}' - if gpu_count > 0 else None)) - if pod.status.phase == 'Pending': - # If pod is pending, do not show it in the status - continue - - clusters[cluster_name_on_cloud] = { - 'cluster_name_on_cloud': cluster_name_on_cloud, - 'cluster_name': cluster_name, - 'user': pod.metadata.labels.get('skypilot-user'), - 'status': status_lib.ClusterStatus.UP, - 'pods': [], - 'launched_at': start_time, - 'resources': resources, - } - else: - # Update start_time if this pod started earlier - pod_start_time = pod.status.start_time - if pod_start_time is not None: - pod_start_time = pod_start_time.timestamp() - if pod_start_time < clusters[cluster_name_on_cloud][ - 'launched_at']: - clusters[cluster_name_on_cloud][ - 'launched_at'] = pod_start_time - clusters[cluster_name_on_cloud]['pods'].append(pod) - # Update resources_str in clusters: - for cluster_name, cluster in clusters.items(): - resources = cluster['resources'] - num_pods = len(cluster['pods']) - resources_str = f'{num_pods}x {resources}' - cluster['resources_str'] = resources_str - return list(clusters.values()), jobs_controllers, serve_controllers +def _status_kubernetes(show_all: bool): + context = kubernetes_utils.get_current_kube_config_context_name() + try: + pods = kubernetes_utils.get_skypilot_pods(context) + except exceptions.ResourcesUnavailableError as e: + with ux_utils.print_exception_no_traceback(): + raise ValueError('Failed to get SkyPilot pods from ' + f'Kubernetes: {str(e)}') from e + all_clusters, jobs_controllers, serve_controllers = ( + status_utils.process_skypilot_pods(pods, context)) + all_jobs = [] + for _, job_controller_info in jobs_controllers.items(): + user = job_controller_info['user'] + pod = job_controller_info['pods'][0] + with rich_utils.safe_status('[bold cyan]Checking in-progress ' + f'managed jobs for {user}[/]'): + job_list = managed_jobs.queue_kubernetes(pod.metadata.name) + # Add user field to jobs + for job in job_list: + job['user'] = user + all_jobs.extend(job_list) + # Reconcile cluster state between managed jobs and clusters: + # To maintain a clear separation between regular SkyPilot clusters + # and those from managed jobs, we need to exclude the latter from + # the main cluster list. + # We do this by reconstructing managed job cluster names from each + # job's name and ID. We then use this set to filter out managed + # clusters from the main cluster list. This is necessary because there + # are no identifiers distinguishing clusters from managed jobs from + # regular clusters. + managed_job_cluster_names = set() + for job in all_jobs: + # Managed job cluster name is - + managed_cluster_name = f'{job["job_name"]}-{job["job_id"]}' + managed_job_cluster_names.add(managed_cluster_name) + unmanaged_clusters = [ + c for c in all_clusters + if c['cluster_name'] not in managed_job_cluster_names + ] + click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'Kubernetes cluster state (context: {context})' + f'{colorama.Style.RESET_ALL}') + status_utils.show_kubernetes_cluster_status_table( + unmanaged_clusters, show_all) + if all_jobs: + click.echo(f'\n{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'Managed jobs from {len(jobs_controllers)} users' + f'{colorama.Style.RESET_ALL}') + msg = managed_jobs.format_job_table(all_jobs, show_all=show_all) + click.echo(msg) + if serve_controllers: + # TODO: Parse serve controllers and show services separately. + # Currently we show a hint that services are shown as clusters. + click.echo(f'\nHint: SkyServe controllers detected in the cluster. ' + 'SkyServe service replicas will be shown as SkyPilot ' + 'clusters') @cli.command() @@ -1603,7 +1563,7 @@ def _process_skypilot_pods( required=False, help='Also show sky serve services, if any.') @click.option( - '--kubernetes', + '--kubernetes', '--k8s', default=False, is_flag=True, required=False, @@ -1677,66 +1637,7 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, cluster statuses from the cloud providers. """ if kubernetes: - context = kubernetes_utils.get_current_kube_config_context_name() - try: - pods = kubernetes_utils.get_skypilot_pods(context) - except exceptions.ResourcesUnavailableError as e: - with ux_utils.print_exception_no_traceback(): - raise ValueError('Failed to get SkyPilot pods from ' - f'Kubernetes: {str(e)}') from e - - all_clusters, jobs_controllers, _ = (_process_skypilot_pods( - pods, context)) - - all_jobs = [] - - for _, job_controller_info in jobs_controllers.items(): - user = job_controller_info['user'] - pod = job_controller_info['pods'][0] - with rich_utils.safe_status('[bold cyan]Checking in-progress ' - f'managed jobs for {user}[/]'): - job_list = managed_jobs.queue_kubernetes(pod.metadata.name) - # Add user field to jobs - for job in job_list: - job['user'] = user - all_jobs.extend(job_list) - - # Reconcile cluster state between managed jobs and clusters: - # To maintain a clear separation between regular SkyPilot clusters - # and those from managed jobs, we need to exclude the latter from - # the main cluster list. - # We do this by reconstructing managed job cluster names from each - # job's name and ID. We then use this set to filter out managed - # clusters from the main cluster list. This is necessary because there - # are no identifiers distinguishing clusters from managed jobs from - # regular clusters. - - managed_job_cluster_names = set() - for job in all_jobs: - # Managed job cluster name is - - managed_cluster_name = f'{job["job_name"]}-{job["job_id"]}' - managed_job_cluster_names.add(managed_cluster_name) - - unmanaged_clusters = [ - c for c in all_clusters - if c['cluster_name'] not in managed_job_cluster_names - ] - - click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' - f'Kubernetes cluster state (context: {context})' - f'{colorama.Style.RESET_ALL}') - click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' - f'SkyPilot clusters' - f'{colorama.Style.RESET_ALL}') - status_utils.show_kubernetes_cluster_status_table( - unmanaged_clusters, all) - - if all_jobs: - click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' - f'Managed jobs from {len(jobs_controllers)} users' - f'{colorama.Style.RESET_ALL}') - msg = managed_jobs.format_job_table(all_jobs, show_all=all) - click.echo(msg) + _status_kubernetes(all) return # Using a pool with 2 worker to run the managed job query and sky serve # service query in parallel to speed up. The pool provides a AsyncResult diff --git a/sky/data/storage_utils.py b/sky/data/storage_utils.py index 245325806a3..89ff904bdb2 100644 --- a/sky/data/storage_utils.py +++ b/sky/data/storage_utils.py @@ -6,10 +6,11 @@ import colorama +import sky.utils.common_utils from sky import exceptions from sky import sky_logging +from sky.utils import common_utils from sky.utils import log_utils -from sky.utils.cli_utils import status_utils logger = sky_logging.init_logger(__name__) @@ -43,8 +44,8 @@ def format_storage_table(storages: List[Dict[str, Any]], if show_all: command = row['last_use'] else: - command = status_utils.truncate_long_string( - row['last_use'], status_utils.COMMAND_TRUNC_LENGTH) + command = sky.utils.common_utils.truncate_long_string( + row['last_use'], common_utils.COMMAND_TRUNC_LENGTH) storage_table.add_row([ # NAME row['name'], diff --git a/sky/utils/cli_utils/status_utils.py b/sky/utils/cli_utils/status_utils.py index edf5c9120b2..717eedb86cb 100644 --- a/sky/utils/cli_utils/status_utils.py +++ b/sky/utils/cli_utils/status_utils.py @@ -1,14 +1,17 @@ """Utilities for sky status.""" -from typing import Any, Callable, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional, Tuple import click import colorama from sky import backends from sky import status_lib +from sky import resources as resources_lib from sky.skylet import constants +from sky.utils import common_utils from sky.utils import log_utils from sky.utils import resources_utils +from sky.provision.kubernetes import utils as kubernetes_utils COMMAND_TRUNC_LENGTH = 25 NUM_COST_REPORT_LINES = 5 @@ -19,25 +22,6 @@ _ClusterCostReportRecord = Dict[str, Any] -def truncate_long_string(s: str, max_length: int = 35) -> str: - if len(s) <= max_length: - return s - splits = s.split(' ') - if len(splits[0]) > max_length: - return splits[0][:max_length] + '...' # Use '…'? - # Truncate on word boundary. - i = 0 - total = 0 - for i, part in enumerate(splits): - total += len(part) - if total >= max_length: - break - prefix = ' '.join(splits[:i]) - if len(prefix) < max_length: - prefix += s[len(prefix):max_length] - return prefix + '...' - - class StatusColumn: """One column of the displayed cluster table""" @@ -54,7 +38,7 @@ def __init__(self, def calc(self, record): val = self.calc_func(record) if self.trunc_length != 0: - val = truncate_long_string(str(val), self.trunc_length) + val = common_utils.truncate_long_string(str(val), self.trunc_length) return val @@ -352,6 +336,107 @@ def show_kubernetes_cluster_status_table(clusters: List[Any], cluster_table.add_row(row) if clusters: + click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'SkyPilot clusters' + f'{colorama.Style.RESET_ALL}') click.echo(cluster_table) else: click.echo('No existing SkyPilot clusters on Kubernetes.') + + +def process_skypilot_pods( + pods: List[Any], + context: Optional[str] = None +) -> Tuple[List[Dict[Any, Any]], Dict[str, Any], Dict[str, Any]]: + clusters: Dict[str, Dict] = {} + jobs_controllers: Dict[str, Dict] = {} + serve_controllers: Dict[str, Dict] = {} + + for pod in pods: + cluster_name_on_cloud = pod.metadata.labels.get('skypilot-cluster') + cluster_name = cluster_name_on_cloud.rsplit( + '-', 1 + )[0] # Remove the user hash to get cluster name (e.g., mycluster-2ea4) + + # Check if cluster name is name of a controller + # Can't use controller_utils.Controllers.from_name(cluster_name) + # because hash is different across users + if 'controller' in cluster_name_on_cloud: + start_time = pod.status.start_time.timestamp() + controller_info = { + 'cluster_name_on_cloud': cluster_name_on_cloud, + 'cluster_name': cluster_name, + 'user': pod.metadata.labels.get('skypilot-user'), + 'status': status_lib.ClusterStatus.UP, + # Assuming UP if pod exists + 'pods': [pod], + 'launched_at': start_time + } + if 'sky-jobs-controller' in cluster_name_on_cloud: + jobs_controllers[cluster_name_on_cloud] = controller_info + elif 'sky-serve-controller' in cluster_name_on_cloud: + serve_controllers[cluster_name_on_cloud] = controller_info + + if cluster_name_on_cloud not in clusters: + # Parse the start time for the cluster + start_time = pod.status.start_time + if start_time is not None: + start_time = pod.status.start_time.timestamp() + + # Parse resources + cpu_request = kubernetes_utils.parse_cpu_or_gpu_resource( + pod.spec.containers[0].resources.requests.get('cpu', '0')) + memory_request = kubernetes_utils.parse_memory_resource( + pod.spec.containers[0].resources.requests.get('memory', '0'), + unit='G') + gpu_count = kubernetes_utils.parse_cpu_or_gpu_resource( + pod.spec.containers[0].resources.requests.get( + 'nvidia.com/gpu', '0')) + if gpu_count > 0: + label_formatter, _ = ( + kubernetes_utils.detect_gpu_label_formatter(context)) + assert label_formatter is not None, ( + 'GPU label formatter cannot be None if there are pods ' + f'requesting GPUs: {pod.metadata.name}') + gpu_label = label_formatter.get_label_key() + # Get GPU name from pod node selector + if pod.spec.node_selector is not None: + gpu_name = label_formatter.get_accelerator_from_label_value( + pod.spec.node_selector.get(gpu_label)) + + resources = resources_lib.Resources( + cloud=sky_clouds.Kubernetes(), + cpus=int(cpu_request), + memory=int(memory_request), + accelerators=(f'{gpu_name}:{gpu_count}' + if gpu_count > 0 else None)) + if pod.status.phase == 'Pending': + # If pod is pending, do not show it in the status + continue + + clusters[cluster_name_on_cloud] = { + 'cluster_name_on_cloud': cluster_name_on_cloud, + 'cluster_name': cluster_name, + 'user': pod.metadata.labels.get('skypilot-user'), + 'status': status_lib.ClusterStatus.UP, + 'pods': [], + 'launched_at': start_time, + 'resources': resources, + } + else: + # Update start_time if this pod started earlier + pod_start_time = pod.status.start_time + if pod_start_time is not None: + pod_start_time = pod_start_time.timestamp() + if pod_start_time < clusters[cluster_name_on_cloud][ + 'launched_at']: + clusters[cluster_name_on_cloud][ + 'launched_at'] = pod_start_time + clusters[cluster_name_on_cloud]['pods'].append(pod) + # Update resources_str in clusters: + for cluster_name, cluster in clusters.items(): + resources = cluster['resources'] + num_pods = len(cluster['pods']) + resources_str = f'{num_pods}x {resources}' + cluster['resources_str'] = resources_str + return list(clusters.values()), jobs_controllers, serve_controllers diff --git a/sky/utils/common_utils.py b/sky/utils/common_utils.py index dffe784cc33..6f95907255f 100644 --- a/sky/utils/common_utils.py +++ b/sky/utils/common_utils.py @@ -679,3 +679,22 @@ def new_func(*args, **kwargs): return func(*args, **kwargs) return new_func + + +def truncate_long_string(s: str, max_length: int = 35) -> str: + if len(s) <= max_length: + return s + splits = s.split(' ') + if len(splits[0]) > max_length: + return splits[0][:max_length] + '...' # Use '…'? + # Truncate on word boundary. + i = 0 + total = 0 + for i, part in enumerate(splits): + total += len(part) + if total >= max_length: + break + prefix = ' '.join(splits[:i]) + if len(prefix) < max_length: + prefix += s[len(prefix):max_length] + return prefix + '...' From f254643b55e5214c5d842244a2799c1c71960cb6 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Wed, 9 Oct 2024 15:14:34 -0700 Subject: [PATCH 16/23] lint --- sky/cli.py | 10 +++++----- sky/data/storage_utils.py | 7 ++++--- sky/utils/cli_utils/status_utils.py | 11 ++++++----- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 051b14005de..3b9718164cf 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -53,7 +53,6 @@ from sky import exceptions from sky import global_user_state from sky import jobs as managed_jobs -from sky import resources as resources_lib from sky import serve as serve_lib from sky import sky_logging from sky import status_lib @@ -1501,8 +1500,8 @@ def _status_kubernetes(show_all: bool): click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' f'Kubernetes cluster state (context: {context})' f'{colorama.Style.RESET_ALL}') - status_utils.show_kubernetes_cluster_status_table( - unmanaged_clusters, show_all) + status_utils.show_kubernetes_cluster_status_table(unmanaged_clusters, + show_all) if all_jobs: click.echo(f'\n{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' f'Managed jobs from {len(jobs_controllers)} users' @@ -1512,7 +1511,7 @@ def _status_kubernetes(show_all: bool): if serve_controllers: # TODO: Parse serve controllers and show services separately. # Currently we show a hint that services are shown as clusters. - click.echo(f'\nHint: SkyServe controllers detected in the cluster. ' + click.echo('\nHint: SkyServe controllers detected in the cluster. ' 'SkyServe service replicas will be shown as SkyPilot ' 'clusters') @@ -1563,7 +1562,8 @@ def _status_kubernetes(show_all: bool): required=False, help='Also show sky serve services, if any.') @click.option( - '--kubernetes', '--k8s', + '--kubernetes', + '--k8s', default=False, is_flag=True, required=False, diff --git a/sky/data/storage_utils.py b/sky/data/storage_utils.py index 89ff904bdb2..d3e4bde7b01 100644 --- a/sky/data/storage_utils.py +++ b/sky/data/storage_utils.py @@ -6,7 +6,6 @@ import colorama -import sky.utils.common_utils from sky import exceptions from sky import sky_logging from sky.utils import common_utils @@ -20,6 +19,8 @@ 'to the cloud storage for {path!r}' 'due to the following error: {error_msg!r}') +LAST_USE_TRUNC_LENGTH = 25 + def format_storage_table(storages: List[Dict[str, Any]], show_all: bool = False) -> str: @@ -44,8 +45,8 @@ def format_storage_table(storages: List[Dict[str, Any]], if show_all: command = row['last_use'] else: - command = sky.utils.common_utils.truncate_long_string( - row['last_use'], common_utils.COMMAND_TRUNC_LENGTH) + command = common_utils.truncate_long_string(row['last_use'], + LAST_USE_TRUNC_LENGTH) storage_table.add_row([ # NAME row['name'], diff --git a/sky/utils/cli_utils/status_utils.py b/sky/utils/cli_utils/status_utils.py index 717eedb86cb..e7520b2a5e4 100644 --- a/sky/utils/cli_utils/status_utils.py +++ b/sky/utils/cli_utils/status_utils.py @@ -5,13 +5,14 @@ import colorama from sky import backends -from sky import status_lib +from sky import clouds as sky_clouds from sky import resources as resources_lib +from sky import status_lib +from sky.provision.kubernetes import utils as kubernetes_utils from sky.skylet import constants from sky.utils import common_utils from sky.utils import log_utils from sky.utils import resources_utils -from sky.provision.kubernetes import utils as kubernetes_utils COMMAND_TRUNC_LENGTH = 25 NUM_COST_REPORT_LINES = 5 @@ -345,8 +346,8 @@ def show_kubernetes_cluster_status_table(clusters: List[Any], def process_skypilot_pods( - pods: List[Any], - context: Optional[str] = None + pods: List[Any], + context: Optional[str] = None ) -> Tuple[List[Dict[Any, Any]], Dict[str, Any], Dict[str, Any]]: clusters: Dict[str, Dict] = {} jobs_controllers: Dict[str, Dict] = {} @@ -429,7 +430,7 @@ def process_skypilot_pods( if pod_start_time is not None: pod_start_time = pod_start_time.timestamp() if pod_start_time < clusters[cluster_name_on_cloud][ - 'launched_at']: + 'launched_at']: clusters[cluster_name_on_cloud][ 'launched_at'] = pod_start_time clusters[cluster_name_on_cloud]['pods'].append(pod) From 4cc7249941add64a1ce44d27132ee411eec465a4 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Wed, 9 Oct 2024 15:24:41 -0700 Subject: [PATCH 17/23] merge --- sky/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/cli.py b/sky/cli.py index 3b9718164cf..721fa34f996 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1513,7 +1513,7 @@ def _status_kubernetes(show_all: bool): # Currently we show a hint that services are shown as clusters. click.echo('\nHint: SkyServe controllers detected in the cluster. ' 'SkyServe service replicas will be shown as SkyPilot ' - 'clusters') + 'clusters.') @cli.command() From 20b34ff5d0e187fcdbaaa327aa3fb9a1550b6dd3 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Wed, 9 Oct 2024 15:42:43 -0700 Subject: [PATCH 18/23] Fix context name in sky show-gpus --- sky/cli.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 721fa34f996..049233048ce 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3181,7 +3181,10 @@ def _output(): print_section_titles = False # If cloud is kubernetes, we want to show real-time capacity if kubernetes_is_enabled and (cloud is None or cloud_is_kubernetes): - context = region + if region: + context = region + else: + context = kubernetes_utils.get_current_kube_config_context_name() try: # If --cloud kubernetes is not specified, we want to catch # the case where no GPUs are available on the cluster and @@ -3196,7 +3199,7 @@ def _output(): else: print_section_titles = True yield (f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' - f'Kubernetes GPUs (Context: {context})' + f'Kubernetes GPUs (context: {context})' f'{colorama.Style.RESET_ALL}\n') yield from k8s_realtime_table.get_string() k8s_node_table = _get_kubernetes_node_info_table(context) From fea765e14f0c415e99c90dfbd24807ca41cf0b48 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Wed, 9 Oct 2024 16:05:58 -0700 Subject: [PATCH 19/23] lint --- sky/cli.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sky/cli.py b/sky/cli.py index 049233048ce..b4bcc9126a9 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3184,7 +3184,9 @@ def _output(): if region: context = region else: - context = kubernetes_utils.get_current_kube_config_context_name() + # If region is not specified, we use the current context + context = (kubernetes_utils. + get_current_kube_config_context_name()) try: # If --cloud kubernetes is not specified, we want to catch # the case where no GPUs are available on the cluster and From a0a24b75babc2b3f49179b5eb3163b9d1263d8e8 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Wed, 9 Oct 2024 16:10:08 -0700 Subject: [PATCH 20/23] lint --- sky/cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index b4bcc9126a9..87953c5283c 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3185,8 +3185,8 @@ def _output(): context = region else: # If region is not specified, we use the current context - context = (kubernetes_utils. - get_current_kube_config_context_name()) + context = ( + kubernetes_utils.get_current_kube_config_context_name()) try: # If --cloud kubernetes is not specified, we want to catch # the case where no GPUs are available on the cluster and From ae2c2446dafc540162d939f03b465940a2c695ce Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Thu, 10 Oct 2024 12:05:58 -0700 Subject: [PATCH 21/23] fixes --- sky/cli.py | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 87953c5283c..f5b708a8384 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1469,16 +1469,21 @@ def _status_kubernetes(show_all: bool): all_clusters, jobs_controllers, serve_controllers = ( status_utils.process_skypilot_pods(pods, context)) all_jobs = [] - for _, job_controller_info in jobs_controllers.items(): - user = job_controller_info['user'] - pod = job_controller_info['pods'][0] - with rich_utils.safe_status('[bold cyan]Checking in-progress ' - f'managed jobs for {user}[/]'): + with rich_utils.safe_status( + '[bold cyan]Checking in-progress managed jobs[/]') as spinner: + for i, (_, job_controller_info) in enumerate(jobs_controllers.items()): + user = job_controller_info['user'] + pod = job_controller_info['pods'][0] + status_message = ('[bold cyan]Checking in-progress ' + f'managed jobs for {user}') + if len(jobs_controllers) > 1: + status_message += f' ({i+1}/{len(jobs_controllers)})' + spinner.update(f'{status_message}[/]') job_list = managed_jobs.queue_kubernetes(pod.metadata.name) - # Add user field to jobs - for job in job_list: - job['user'] = user - all_jobs.extend(job_list) + # Add user field to jobs + for job in job_list: + job['user'] = user + all_jobs.extend(job_list) # Reconcile cluster state between managed jobs and clusters: # To maintain a clear separation between regular SkyPilot clusters # and those from managed jobs, we need to exclude the latter from @@ -1504,16 +1509,16 @@ def _status_kubernetes(show_all: bool): show_all) if all_jobs: click.echo(f'\n{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' - f'Managed jobs from {len(jobs_controllers)} users' + f'Managed jobs' f'{colorama.Style.RESET_ALL}') msg = managed_jobs.format_job_table(all_jobs, show_all=show_all) click.echo(msg) if serve_controllers: # TODO: Parse serve controllers and show services separately. # Currently we show a hint that services are shown as clusters. - click.echo('\nHint: SkyServe controllers detected in the cluster. ' - 'SkyServe service replicas will be shown as SkyPilot ' - 'clusters.') + click.echo(f'\n{colorama.Style.DIM}Hint: SkyServe replica pods are ' + 'shown in the "SkyPilot clusters" section.' + f'{colorama.Style.RESET_ALL}') @cli.command() @@ -1567,7 +1572,8 @@ def _status_kubernetes(show_all: bool): default=False, is_flag=True, required=False, - help='[Experimental] Show SkyPilot clusters from all users on Kubernetes.') + help='[Experimental] Show all SkyPilot resources (including from other ' + 'users) in the current Kubernetes context.') @click.argument('clusters', required=False, type=str, From 96518263a9dbdbaa2c0ccd46e5c7027630f3fa4e Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Thu, 10 Oct 2024 13:13:28 -0700 Subject: [PATCH 22/23] fixes --- sky/cli.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index f5b708a8384..c65a2a0a131 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1474,12 +1474,16 @@ def _status_kubernetes(show_all: bool): for i, (_, job_controller_info) in enumerate(jobs_controllers.items()): user = job_controller_info['user'] pod = job_controller_info['pods'][0] - status_message = ('[bold cyan]Checking in-progress ' - f'managed jobs for {user}') + status_message = ('[bold cyan]Checking managed jobs controller') if len(jobs_controllers) > 1: - status_message += f' ({i+1}/{len(jobs_controllers)})' + status_message += f's ({i+1}/{len(jobs_controllers)})' spinner.update(f'{status_message}[/]') - job_list = managed_jobs.queue_kubernetes(pod.metadata.name) + try: + job_list = managed_jobs.queue_kubernetes(pod.metadata.name) + except RuntimeError as e: + logger.warning('Failed to get managed jobs from controller ' + f'{pod.metadata.name}: {str(e)}') + job_list = [] # Add user field to jobs for job in job_list: job['user'] = user From 47bc5c2b31b9e5d5f1c11fa65c2fa25321388de0 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Thu, 10 Oct 2024 21:28:41 -0700 Subject: [PATCH 23/23] comments --- sky/cli.py | 8 +++++++- sky/data/storage_utils.py | 4 ++-- sky/jobs/__init__.py | 4 ++-- sky/jobs/core.py | 7 ++++--- sky/utils/cli_utils/status_utils.py | 24 +++++++++++++++++++++++- sky/utils/common_utils.py | 1 + 6 files changed, 39 insertions(+), 9 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index c65a2a0a131..fd1379813ab 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1459,6 +1459,11 @@ def _get_services(service_names: Optional[List[str]], def _status_kubernetes(show_all: bool): + """Show all SkyPilot resources in the current Kubernetes context. + + Args: + show_all (bool): Show all job information (e.g., start time, failures). + """ context = kubernetes_utils.get_current_kube_config_context_name() try: pods = kubernetes_utils.get_skypilot_pods(context) @@ -1479,7 +1484,8 @@ def _status_kubernetes(show_all: bool): status_message += f's ({i+1}/{len(jobs_controllers)})' spinner.update(f'{status_message}[/]') try: - job_list = managed_jobs.queue_kubernetes(pod.metadata.name) + job_list = managed_jobs.queue_from_kubernetes_pod( + pod.metadata.name) except RuntimeError as e: logger.warning('Failed to get managed jobs from controller ' f'{pod.metadata.name}: {str(e)}') diff --git a/sky/data/storage_utils.py b/sky/data/storage_utils.py index dfe7c8b2a14..7b5bf48d5db 100644 --- a/sky/data/storage_utils.py +++ b/sky/data/storage_utils.py @@ -21,7 +21,7 @@ 'to the cloud storage for {path!r}' 'due to the following error: {error_msg!r}') -LAST_USE_TRUNC_LENGTH = 25 +_LAST_USE_TRUNC_LENGTH = 25 def format_storage_table(storages: List[Dict[str, Any]], @@ -48,7 +48,7 @@ def format_storage_table(storages: List[Dict[str, Any]], command = row['last_use'] else: command = common_utils.truncate_long_string(row['last_use'], - LAST_USE_TRUNC_LENGTH) + _LAST_USE_TRUNC_LENGTH) storage_table.add_row([ # NAME row['name'], diff --git a/sky/jobs/__init__.py b/sky/jobs/__init__.py index 8dcc7a399fb..5688ca7c7a2 100644 --- a/sky/jobs/__init__.py +++ b/sky/jobs/__init__.py @@ -8,7 +8,7 @@ from sky.jobs.core import cancel from sky.jobs.core import launch from sky.jobs.core import queue -from sky.jobs.core import queue_kubernetes +from sky.jobs.core import queue_from_kubernetes_pod from sky.jobs.core import tail_logs from sky.jobs.recovery_strategy import DEFAULT_RECOVERY_STRATEGY from sky.jobs.recovery_strategy import RECOVERY_STRATEGIES @@ -35,7 +35,7 @@ 'cancel', 'launch', 'queue', - 'queue_kubernetes', + 'queue_from_kubernetes_pod', 'tail_logs', # utils 'ManagedJobCodeGen', diff --git a/sky/jobs/core.py b/sky/jobs/core.py index a92aee74d58..2cfc2783b4b 100644 --- a/sky/jobs/core.py +++ b/sky/jobs/core.py @@ -140,9 +140,10 @@ def launch( _disable_controller_check=True) -def queue_kubernetes(pod_name: str, - context: Optional[str] = None, - skip_finished: bool = False) -> List[Dict[str, Any]]: +def queue_from_kubernetes_pod( + pod_name: str, + context: Optional[str] = None, + skip_finished: bool = False) -> List[Dict[str, Any]]: """Gets the jobs queue from a specific controller pod. Args: diff --git a/sky/utils/cli_utils/status_utils.py b/sky/utils/cli_utils/status_utils.py index e7520b2a5e4..09172f24814 100644 --- a/sky/utils/cli_utils/status_utils.py +++ b/sky/utils/cli_utils/status_utils.py @@ -342,13 +342,35 @@ def show_kubernetes_cluster_status_table(clusters: List[Any], f'{colorama.Style.RESET_ALL}') click.echo(cluster_table) else: - click.echo('No existing SkyPilot clusters on Kubernetes.') + click.echo('No SkyPilot resources found in the ' + 'active Kubernetes context.') def process_skypilot_pods( pods: List[Any], context: Optional[str] = None ) -> Tuple[List[Dict[Any, Any]], Dict[str, Any], Dict[str, Any]]: + """Process SkyPilot pods on k8s to extract cluster and controller info. + + Args: + pods: List of Kubernetes pod objects. + context: Kubernetes context name, used to detect GPU label formatter. + + Returns: + A tuple containing: + - List of dictionaries with cluster information. + - Dictionary of job controller information. + - Dictionary of serve controller information. + + Each dictionary contains the following keys: + 'cluster_name_on_cloud': The cluster_name_on_cloud used by SkyPilot + 'cluster_name': The cluster name without the user hash + 'user': The user who created the cluster. Fetched from pod label + 'status': The cluster status (assumed UP if pod exists) + 'pods': List of pod objects in the cluster + 'launched_at': Timestamp of when the cluster was launched + 'resources': sky.Resources object for the cluster + """ clusters: Dict[str, Dict] = {} jobs_controllers: Dict[str, Dict] = {} serve_controllers: Dict[str, Dict] = {} diff --git a/sky/utils/common_utils.py b/sky/utils/common_utils.py index 6f95907255f..4a8e6aa37d6 100644 --- a/sky/utils/common_utils.py +++ b/sky/utils/common_utils.py @@ -682,6 +682,7 @@ def new_func(*args, **kwargs): def truncate_long_string(s: str, max_length: int = 35) -> str: + """Truncate a string to a maximum length, preserving whole words.""" if len(s) <= max_length: return s splits = s.split(' ')