From 421ec562033b2d74e4e42bcf961b872c753942d8 Mon Sep 17 00:00:00 2001 From: cblmemo Date: Thu, 7 Nov 2024 11:26:04 -0800 Subject: [PATCH 1/7] [Core][Docker] Support docker login on RunPod. --- sky/provision/runpod/instance.py | 4 +++- sky/provision/runpod/utils.py | 26 +++++++++++++++++++++++++- sky/setup_files/setup.py | 4 +++- sky/templates/runpod-ray.yml.j2 | 13 +++++++++++++ 4 files changed, 44 insertions(+), 3 deletions(-) diff --git a/sky/provision/runpod/instance.py b/sky/provision/runpod/instance.py index 1f297a4b2e0..24995d36a49 100644 --- a/sky/provision/runpod/instance.py +++ b/sky/provision/runpod/instance.py @@ -89,7 +89,9 @@ def run_instances(region: str, cluster_name_on_cloud: str, disk_size=config.node_config['DiskSize'], image_name=config.node_config['ImageId'], ports=config.ports_to_open_on_launch, - public_key=config.node_config['PublicKey']) + public_key=config.node_config['PublicKey'], + docker_login_config=config.provider_config.get( + 'docker_login_config')) except Exception as e: # pylint: disable=broad-except logger.warning(f'run_instances error: {e}') raise diff --git a/sky/provision/runpod/utils.py b/sky/provision/runpod/utils.py index f1587463e84..6346c962abc 100644 --- a/sky/provision/runpod/utils.py +++ b/sky/provision/runpod/utils.py @@ -6,6 +6,7 @@ from sky import sky_logging from sky.adaptors import runpod +from sky.provision import docker_utils from sky.skylet import constants from sky.utils import common_utils @@ -100,7 +101,8 @@ def list_instances() -> Dict[str, Dict[str, Any]]: def launch(name: str, instance_type: str, region: str, disk_size: int, - image_name: str, ports: Optional[List[int]], public_key: str) -> str: + image_name: str, ports: Optional[List[int]], public_key: str, + docker_login_config: Optional[Dict[str, str]]) -> str: """Launches an instance with the given parameters. Converts the instance_type to the RunPod GPU name, finds the specs for the @@ -142,6 +144,27 @@ def launch(name: str, instance_type: str, region: str, disk_size: int, if ports is not None: custom_ports_str = ''.join([f'{p}/tcp,' for p in ports]) + template_id = None + if docker_login_config is not None: + login_config = docker_utils.DockerLoginConfig(**docker_login_config) + # TODO(tian): The `name` argument seems only for display purpose but + # not specifying the registry server. Double check if that works for + # registries other than Docker Hub. + # TODO(tian): Delete the registry auth and template after the instance + # is terminated. + create_auth_resp = runpod.runpod.create_container_registry_auth( + name=f'{name}-registry-auth', + username=login_config.username, + password=login_config.password, + ) + registry_auth_id = create_auth_resp['id'] + create_template_resp = runpod.runpod.create_template( + name=f'{name}-template', + image_name=image_name, + registry_auth_id=registry_auth_id, + ) + template_id = create_template_resp['id'] + new_instance = runpod.runpod.create_pod( name=name, image_name=image_name, @@ -157,6 +180,7 @@ def launch(name: str, instance_type: str, region: str, disk_size: int, f'{constants.SKY_REMOTE_RAY_DASHBOARD_PORT}/http,' f'{constants.SKY_REMOTE_RAY_PORT}/http'), support_public_ip=True, + template_id=template_id, docker_args= f'bash -c \'echo {encoded} | base64 --decode > init.sh; bash init.sh\'') diff --git a/sky/setup_files/setup.py b/sky/setup_files/setup.py index 0fd6978ec03..5454e7fee26 100644 --- a/sky/setup_files/setup.py +++ b/sky/setup_files/setup.py @@ -234,7 +234,9 @@ def parse_readme(readme: str) -> str: 'oci': ['oci'] + local_ray, 'kubernetes': ['kubernetes>=20.0.0'], 'remote': remote, - 'runpod': ['runpod>=1.5.1'], + # For the container registry auth api. Reference: + # https://github.com/runpod/runpod-python/releases/tag/1.6.1 + 'runpod': ['runpod>=1.6.1'], 'fluidstack': [], # No dependencies needed for fluidstack 'cudo': ['cudo-compute>=0.1.10'], 'paperspace': [], # No dependencies needed for paperspace diff --git a/sky/templates/runpod-ray.yml.j2 b/sky/templates/runpod-ray.yml.j2 index 4d9b0637bd0..3e1dc3bffea 100644 --- a/sky/templates/runpod-ray.yml.j2 +++ b/sky/templates/runpod-ray.yml.j2 @@ -10,6 +10,19 @@ provider: module: sky.provision.runpod region: "{{region}}" disable_launch_config_check: true + # For RunPod, we directly set the image id for the docker as runtime environment + # support, thus we need to avoid the DockerInitializer detects the docker field + # and performs the initialization. Therefore we put the docker login config in + # the provider config here. + {%- if docker_login_config is not none %} + docker_login_config: + username: |- + {{docker_login_config.username}} + password: |- + {{docker_login_config.password}} + server: |- + {{docker_login_config.server}} + {%- endif %} auth: ssh_user: root From d7a0c3790039e24c7327e96d5fe702c61b9c70a5 Mon Sep 17 00:00:00 2001 From: cblmemo Date: Thu, 26 Dec 2024 18:14:25 -0800 Subject: [PATCH 2/7] nit --- sky/provision/runpod/utils.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sky/provision/runpod/utils.py b/sky/provision/runpod/utils.py index 8a5194ba426..9ac1b745147 100644 --- a/sky/provision/runpod/utils.py +++ b/sky/provision/runpod/utils.py @@ -141,10 +141,17 @@ def launch(name: str, instance_type: str, region: str, disk_size: int, # Use base64 to deal with the tricky quoting issues caused by runpod API. encoded = base64.b64encode(setup_cmd.encode('utf-8')).decode('utf-8') + docker_args = (f'bash -c \'echo {encoded} | base64 --decode > init.sh; ' + f'bash init.sh\'') + # Port 8081 is occupied for nginx in the base image. custom_ports_str = '' if ports is not None: custom_ports_str = ''.join([f'{p}/tcp,' for p in ports]) + ports_str = (f'22/tcp,' + f'{custom_ports_str}' + f'{constants.SKY_REMOTE_RAY_DASHBOARD_PORT}/http,' + f'{constants.SKY_REMOTE_RAY_PORT}/http') template_id = None if docker_login_config is not None: @@ -167,13 +174,6 @@ def launch(name: str, instance_type: str, region: str, disk_size: int, ) template_id = create_template_resp['id'] - docker_args = (f'bash -c \'echo {encoded} | base64 --decode > init.sh; ' - f'bash init.sh\'') - ports = (f'22/tcp,' - f'{custom_ports_str}' - f'{constants.SKY_REMOTE_RAY_DASHBOARD_PORT}/http,' - f'{constants.SKY_REMOTE_RAY_PORT}/http') - params = { 'name': name, 'image_name': image_name, @@ -184,7 +184,7 @@ def launch(name: str, instance_type: str, region: str, disk_size: int, 'min_memory_in_gb': gpu_specs['memoryInGb'] * gpu_quantity, 'gpu_count': gpu_quantity, 'country_code': region, - 'ports': ports, + 'ports': ports_str, 'support_public_ip': True, 'docker_args': docker_args, 'template_id': template_id, From 18c1e0b927c6288b904f9759e76af5d06c06d5f7 Mon Sep 17 00:00:00 2001 From: cblmemo Date: Fri, 27 Dec 2024 00:03:59 -0800 Subject: [PATCH 3/7] works --- docs/source/getting-started/installation.rst | 2 +- sky/adaptors/runpod.py | 8 ++ sky/provision/docker_utils.py | 11 ++- sky/provision/runpod/instance.py | 3 +- sky/provision/runpod/utils.py | 78 +++++++++++++------- sky/skylet/providers/command_runner.py | 12 ++- 6 files changed, 77 insertions(+), 37 deletions(-) diff --git a/docs/source/getting-started/installation.rst b/docs/source/getting-started/installation.rst index deb2307b67b..142edb01124 100644 --- a/docs/source/getting-started/installation.rst +++ b/docs/source/getting-started/installation.rst @@ -303,7 +303,7 @@ RunPod .. code-block:: shell - pip install "runpod>=1.5.1" + pip install "runpod>=1.6.1" runpod config diff --git a/sky/adaptors/runpod.py b/sky/adaptors/runpod.py index 84aadd3fffe..788072a6d96 100644 --- a/sky/adaptors/runpod.py +++ b/sky/adaptors/runpod.py @@ -6,3 +6,11 @@ 'runpod', import_error_message='Failed to import dependencies for RunPod. ' 'Try running: pip install "skypilot[runpod]"') + +_LAZY_MODULES = (runpod,) + + +@common.load_lazy_modules(_LAZY_MODULES) +def query_exception(): + """Query exception.""" + return runpod.error.QueryError diff --git a/sky/provision/docker_utils.py b/sky/provision/docker_utils.py index c55508ab41a..3f342b57ce7 100644 --- a/sky/provision/docker_utils.py +++ b/sky/provision/docker_utils.py @@ -38,6 +38,13 @@ class DockerLoginConfig: password: str server: str + def format_image(self, image: str) -> str: + """Format the image name with the server prefix.""" + server_prefix = f'{self.server}/' + if not image.startswith(server_prefix): + return f'{server_prefix}{image}' + return image + @classmethod def from_env_vars(cls, d: Dict[str, str]) -> 'DockerLoginConfig': return cls( @@ -220,9 +227,7 @@ def initialize(self) -> str: wait_for_docker_daemon=True) # We automatically add the server prefix to the image name if # the user did not add it. - server_prefix = f'{docker_login_config.server}/' - if not specific_image.startswith(server_prefix): - specific_image = f'{server_prefix}{specific_image}' + specific_image = docker_login_config.format_image(specific_image) if self.docker_config.get('pull_before_run', True): assert specific_image, ('Image must be included in config if ' + diff --git a/sky/provision/runpod/instance.py b/sky/provision/runpod/instance.py index 18c150b4221..d8e387eb63b 100644 --- a/sky/provision/runpod/instance.py +++ b/sky/provision/runpod/instance.py @@ -83,7 +83,8 @@ def run_instances(region: str, cluster_name_on_cloud: str, node_type = 'head' if head_instance_id is None else 'worker' try: instance_id = utils.launch( - name=f'{cluster_name_on_cloud}-{node_type}', + cluster_name=cluster_name_on_cloud, + node_type=node_type, instance_type=config.node_config['InstanceType'], region=region, disk_size=config.node_config['DiskSize'], diff --git a/sky/provision/runpod/utils.py b/sky/provision/runpod/utils.py index 9ac1b745147..e81968a874e 100644 --- a/sky/provision/runpod/utils.py +++ b/sky/provision/runpod/utils.py @@ -2,7 +2,8 @@ import base64 import time -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple +import uuid from sky import sky_logging from sky.adaptors import runpod @@ -101,15 +102,60 @@ def list_instances() -> Dict[str, Dict[str, Any]]: return instance_dict -def launch(name: str, instance_type: str, region: str, disk_size: int, - image_name: str, ports: Optional[List[int]], public_key: str, - preemptible: Optional[bool], bid_per_gpu: float, +def _create_template_for_docker_login( + cluster_name: str, image_name: str, + docker_login_config: Optional[Dict[str, + str]]) -> Tuple[str, Optional[str]]: + if docker_login_config is None: + return image_name, None + # We add a uuid here to avoid the name conflict for terminating and + # launching with the same cluster name. Please see the comments below + # for reason we cannot cleanup the old resources. + name = f'{cluster_name}-{str(uuid.uuid4())[:4]}' + login_config = docker_utils.DockerLoginConfig(**docker_login_config) + container_registry_auth_name = f'{name}-registry-auth' + container_template_name = f'{name}-docker-login-template' + # The `name` argument is only for display purpose and the registry server + # will be splitted from the docker image name (Tested with AWS ECR). + # Here we only need the username and password to create the registry auth. + # TODO(tian): RunPod python API does not provide a way to get the registry + # auth and template ID by the name, and the only way to get the ID is when + # we create it. So we use a separate auth and template per cluster. This + # also assumes that every cluster has only one node, so no extra worker + # nodes will reuse the same auth and template name. + # TODO(tian): RunPod python API does not provide a way to delete the + # template. So we skip the deletion of template for now. We should + # implement this once they provide the API. + # TODO(tian): We also skipped the deletion of the auth for now, as the + # RunPod python API does not provide a way to delete the auth with the + # name (nor to get the id by the name), which requires we store the id + # at creation somewhere, and returning this value to outer caller will + # increase the call chain complexity. We should implement this once they + # provide the API. + create_auth_resp = runpod.runpod.create_container_registry_auth( + name=container_registry_auth_name, + username=login_config.username, + password=login_config.password, + ) + registry_auth_id = create_auth_resp['id'] + create_template_resp = runpod.runpod.create_template( + name=container_template_name, + image_name=None, + registry_auth_id=registry_auth_id, + ) + return login_config.format_image(image_name), create_template_resp['id'] + + +def launch(cluster_name: str, node_type: str, instance_type: str, region: str, + disk_size: int, image_name: str, ports: Optional[List[int]], + public_key: str, preemptible: Optional[bool], bid_per_gpu: float, docker_login_config: Optional[Dict[str, str]]) -> str: """Launches an instance with the given parameters. Converts the instance_type to the RunPod GPU name, finds the specs for the GPU, and launches the instance. """ + name = f'{cluster_name}-{node_type}' gpu_type = GPU_NAME_MAP[instance_type.split('_')[1]] gpu_quantity = int(instance_type.split('_')[0].replace('x', '')) cloud_type = instance_type.split('_')[2] @@ -153,30 +199,12 @@ def launch(name: str, instance_type: str, region: str, disk_size: int, f'{constants.SKY_REMOTE_RAY_DASHBOARD_PORT}/http,' f'{constants.SKY_REMOTE_RAY_PORT}/http') - template_id = None - if docker_login_config is not None: - login_config = docker_utils.DockerLoginConfig(**docker_login_config) - # TODO(tian): The `name` argument seems only for display purpose but - # not specifying the registry server. Double check if that works for - # registries other than Docker Hub. - # TODO(tian): Delete the registry auth and template after the instance - # is terminated. - create_auth_resp = runpod.runpod.create_container_registry_auth( - name=f'{name}-registry-auth', - username=login_config.username, - password=login_config.password, - ) - registry_auth_id = create_auth_resp['id'] - create_template_resp = runpod.runpod.create_template( - name=f'{name}-template', - image_name=image_name, - registry_auth_id=registry_auth_id, - ) - template_id = create_template_resp['id'] + image_name_formatted, template_id = (_create_template_for_docker_login( + cluster_name, image_name, docker_login_config)) params = { 'name': name, - 'image_name': image_name, + 'image_name': image_name_formatted, 'gpu_type_id': gpu_type, 'cloud_type': cloud_type, 'container_disk_in_gb': disk_size, diff --git a/sky/skylet/providers/command_runner.py b/sky/skylet/providers/command_runner.py index 4f66ef54383..16dbc4d2668 100644 --- a/sky/skylet/providers/command_runner.py +++ b/sky/skylet/providers/command_runner.py @@ -25,7 +25,7 @@ def docker_start_cmds( docker_cmd, ): """Generating docker start command without --rm. - + The code is borrowed from `ray.autoscaler._private.docker`. Changes we made: @@ -159,19 +159,17 @@ def run_init(self, *, as_head: bool, file_mounts: Dict[str, str], return True # SkyPilot: Docker login if user specified a private docker registry. - if "docker_login_config" in self.docker_config: + if 'docker_login_config' in self.docker_config: # TODO(tian): Maybe support a command to get the login password? - docker_login_config: docker_utils.DockerLoginConfig = self.docker_config[ - "docker_login_config"] + docker_login_config: docker_utils.DockerLoginConfig = ( + self.docker_config['docker_login_config']) self._run_with_retry( f'{self.docker_cmd} login --username ' f'{docker_login_config.username} --password ' f'{docker_login_config.password} {docker_login_config.server}') # We automatically add the server prefix to the image name if # the user did not add it. - server_prefix = f'{docker_login_config.server}/' - if not specific_image.startswith(server_prefix): - specific_image = f'{server_prefix}{specific_image}' + specific_image = docker_login_config.format_image(specific_image) if self.docker_config.get('pull_before_run', True): assert specific_image, ('Image must be included in config if ' From 8236882916b15a9b9668dd44094a8efd9e9858af Mon Sep 17 00:00:00 2001 From: cblmemo Date: Fri, 27 Dec 2024 00:04:54 -0800 Subject: [PATCH 4/7] remove unnecessary --- sky/adaptors/runpod.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/sky/adaptors/runpod.py b/sky/adaptors/runpod.py index 788072a6d96..84aadd3fffe 100644 --- a/sky/adaptors/runpod.py +++ b/sky/adaptors/runpod.py @@ -6,11 +6,3 @@ 'runpod', import_error_message='Failed to import dependencies for RunPod. ' 'Try running: pip install "skypilot[runpod]"') - -_LAZY_MODULES = (runpod,) - - -@common.load_lazy_modules(_LAZY_MODULES) -def query_exception(): - """Query exception.""" - return runpod.error.QueryError From 974239ce000f0d6c0590b94b5371115de511bd74 Mon Sep 17 00:00:00 2001 From: cblmemo Date: Fri, 27 Dec 2024 01:28:38 -0800 Subject: [PATCH 5/7] delete template and registry after termination --- sky/backends/cloud_vm_ray_backend.py | 11 ++++ sky/provision/common.py | 1 + sky/provision/runpod/instance.py | 18 +++++-- sky/provision/runpod/utils.py | 78 ++++++++++++++++++---------- 4 files changed, 77 insertions(+), 31 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 9d94f469df3..64c3008f235 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -1570,6 +1570,17 @@ def _retry_zones( config_dict['provision_record'] = provision_record config_dict['resources_vars'] = resources_vars config_dict['handle'] = handle + if provision_record.ephemeral_resources: + # Some ephemeral resources are created during the launch + # process. Add them to the provider config so that they + # can be cleaned up later. + original_config_content = common_utils.read_yaml( + cluster_config_file) + original_config_content['provider'][ + 'ephemeral_resources'] = ( + provision_record.ephemeral_resources) + common_utils.dump_yaml(cluster_config_file, + original_config_content) return config_dict except provision_common.StopFailoverError: with ux_utils.print_exception_no_traceback(): diff --git a/sky/provision/common.py b/sky/provision/common.py index 1d58174090e..f50f3a31822 100644 --- a/sky/provision/common.py +++ b/sky/provision/common.py @@ -78,6 +78,7 @@ class ProvisionRecord: resumed_instance_ids: List[InstanceId] # The IDs of all just created instances. created_instance_ids: List[InstanceId] + ephemeral_resources: List[Any] = dataclasses.field(default_factory=list) def is_instance_just_booted(self, instance_id: InstanceId) -> bool: """Whether or not the instance is just booted. diff --git a/sky/provision/runpod/instance.py b/sky/provision/runpod/instance.py index d8e387eb63b..23a97efd305 100644 --- a/sky/provision/runpod/instance.py +++ b/sky/provision/runpod/instance.py @@ -79,10 +79,11 @@ def run_instances(region: str, cluster_name_on_cloud: str, created_instance_ids=[]) created_instance_ids = [] + ephemeral_resources = [] for _ in range(to_start_count): node_type = 'head' if head_instance_id is None else 'worker' try: - instance_id = utils.launch( + instance_id, ers = utils.launch( cluster_name=cluster_name_on_cloud, node_type=node_type, instance_type=config.node_config['InstanceType'], @@ -96,6 +97,9 @@ def run_instances(region: str, cluster_name_on_cloud: str, docker_login_config=config.provider_config.get( 'docker_login_config'), ) + for er in ers: + if er is not None: + ephemeral_resources.append(er) except Exception as e: # pylint: disable=broad-except logger.warning(f'run_instances error: {e}') raise @@ -124,7 +128,8 @@ def run_instances(region: str, cluster_name_on_cloud: str, zone=None, head_instance_id=head_instance_id, resumed_instance_ids=[], - created_instance_ids=created_instance_ids) + created_instance_ids=created_instance_ids, + ephemeral_resources=ephemeral_resources) def wait_instances(region: str, cluster_name_on_cloud: str, @@ -146,7 +151,8 @@ def terminate_instances( worker_only: bool = False, ) -> None: """See sky/provision/__init__.py""" - del provider_config # unused + assert provider_config is not None, (cluster_name_on_cloud, provider_config) + ephemeral_resources = provider_config.get('ephemeral_resources', []) instances = _filter_instances(cluster_name_on_cloud, None) for inst_id, inst in instances.items(): logger.debug(f'Terminating instance {inst_id}: {inst}') @@ -160,6 +166,12 @@ def terminate_instances( f'Failed to terminate instance {inst_id}: ' f'{common_utils.format_exception(e, use_bracket=False)}' ) from e + if ephemeral_resources: + # See sky/provision/runpod/utils.py::launch for details + assert len(ephemeral_resources) == 2, ephemeral_resources + template_name, registry_auth_id = ephemeral_resources + utils.delete_pod_template(template_name) + utils.delete_register_auth(registry_auth_id) def get_cluster_info( diff --git a/sky/provision/runpod/utils.py b/sky/provision/runpod/utils.py index e81968a874e..f9656720a6f 100644 --- a/sky/provision/runpod/utils.py +++ b/sky/provision/runpod/utils.py @@ -3,7 +3,6 @@ import base64 import time from typing import Any, Dict, List, Optional, Tuple -import uuid from sky import sky_logging from sky.adaptors import runpod @@ -102,19 +101,44 @@ def list_instances() -> Dict[str, Dict[str, Any]]: return instance_dict +def delete_pod_template(template_name: str) -> None: + """Deletes a pod template.""" + try: + runpod.runpod.api.graphql.run_graphql_query( + f'mutation {{deleteTemplate(templateName: "{template_name}")}}') + except runpod.runpod.error.QueryError as e: + logger.warning(f'Failed to delete template {template_name}: {e}' + 'Please delete it manually.') + + +def delete_register_auth(registry_auth_id: str) -> None: + """Deletes a registry auth.""" + try: + runpod.runpod.delete_container_registry_auth(registry_auth_id) + except runpod.runpod.error.QueryError as e: + logger.warning(f'Failed to delete registry auth {registry_auth_id}: {e}' + 'Please delete it manually.') + + def _create_template_for_docker_login( - cluster_name: str, image_name: str, - docker_login_config: Optional[Dict[str, - str]]) -> Tuple[str, Optional[str]]: + cluster_name: str, + image_name: str, + docker_login_config: Optional[Dict[str, str]], +) -> Tuple[str, Optional[str], Optional[str], Optional[str]]: + """Creates a template for the given image with the docker login config. + + Returns: + formatted_image_name: The formatted image name. + # following fields are None for no docker login config. + template_id: The template ID. + template_name: The template name. + registry_auth_id: The registry auth ID. + """ if docker_login_config is None: - return image_name, None - # We add a uuid here to avoid the name conflict for terminating and - # launching with the same cluster name. Please see the comments below - # for reason we cannot cleanup the old resources. - name = f'{cluster_name}-{str(uuid.uuid4())[:4]}' + return image_name, None, None, None login_config = docker_utils.DockerLoginConfig(**docker_login_config) - container_registry_auth_name = f'{name}-registry-auth' - container_template_name = f'{name}-docker-login-template' + container_registry_auth_name = f'{cluster_name}-registry-auth' + container_template_name = f'{cluster_name}-docker-login-template' # The `name` argument is only for display purpose and the registry server # will be splitted from the docker image name (Tested with AWS ECR). # Here we only need the username and password to create the registry auth. @@ -123,15 +147,6 @@ def _create_template_for_docker_login( # we create it. So we use a separate auth and template per cluster. This # also assumes that every cluster has only one node, so no extra worker # nodes will reuse the same auth and template name. - # TODO(tian): RunPod python API does not provide a way to delete the - # template. So we skip the deletion of template for now. We should - # implement this once they provide the API. - # TODO(tian): We also skipped the deletion of the auth for now, as the - # RunPod python API does not provide a way to delete the auth with the - # name (nor to get the id by the name), which requires we store the id - # at creation somewhere, and returning this value to outer caller will - # increase the call chain complexity. We should implement this once they - # provide the API. create_auth_resp = runpod.runpod.create_container_registry_auth( name=container_registry_auth_name, username=login_config.username, @@ -143,17 +158,23 @@ def _create_template_for_docker_login( image_name=None, registry_auth_id=registry_auth_id, ) - return login_config.format_image(image_name), create_template_resp['id'] + return (login_config.format_image(image_name), create_template_resp['id'], + container_template_name, registry_auth_id) -def launch(cluster_name: str, node_type: str, instance_type: str, region: str, - disk_size: int, image_name: str, ports: Optional[List[int]], - public_key: str, preemptible: Optional[bool], bid_per_gpu: float, - docker_login_config: Optional[Dict[str, str]]) -> str: +def launch( + cluster_name: str, node_type: str, instance_type: str, region: str, + disk_size: int, image_name: str, ports: Optional[List[int]], + public_key: str, preemptible: Optional[bool], bid_per_gpu: float, + docker_login_config: Optional[Dict[str, str]]) -> Tuple[str, List[Any]]: """Launches an instance with the given parameters. Converts the instance_type to the RunPod GPU name, finds the specs for the GPU, and launches the instance. + + Returns: + instance_id: The instance ID. + ephemeral_resources: A list of ephemeral resources. """ name = f'{cluster_name}-{node_type}' gpu_type = GPU_NAME_MAP[instance_type.split('_')[1]] @@ -199,8 +220,9 @@ def launch(cluster_name: str, node_type: str, instance_type: str, region: str, f'{constants.SKY_REMOTE_RAY_DASHBOARD_PORT}/http,' f'{constants.SKY_REMOTE_RAY_PORT}/http') - image_name_formatted, template_id = (_create_template_for_docker_login( - cluster_name, image_name, docker_login_config)) + image_name_formatted, template_id, template_name, registry_auth_id = ( + _create_template_for_docker_login(cluster_name, image_name, + docker_login_config)) params = { 'name': name, @@ -226,7 +248,7 @@ def launch(cluster_name: str, node_type: str, instance_type: str, region: str, **params, ) - return new_instance['id'] + return new_instance['id'], [template_name, registry_auth_id] def remove(instance_id: str) -> None: From 54497c7421c0ab259f2182226e10f9e77860cd53 Mon Sep 17 00:00:00 2001 From: cblmemo Date: Mon, 6 Jan 2025 18:42:49 -0800 Subject: [PATCH 6/7] move to graphql api & remove ephemeral resourcdes --- sky/backends/cloud_vm_ray_backend.py | 11 --- sky/provision/common.py | 1 - sky/provision/runpod/instance.py | 18 ++-- sky/provision/runpod/utils.py | 127 +++++++++++++++++++++++---- 4 files changed, 115 insertions(+), 42 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 64c3008f235..9d94f469df3 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -1570,17 +1570,6 @@ def _retry_zones( config_dict['provision_record'] = provision_record config_dict['resources_vars'] = resources_vars config_dict['handle'] = handle - if provision_record.ephemeral_resources: - # Some ephemeral resources are created during the launch - # process. Add them to the provider config so that they - # can be cleaned up later. - original_config_content = common_utils.read_yaml( - cluster_config_file) - original_config_content['provider'][ - 'ephemeral_resources'] = ( - provision_record.ephemeral_resources) - common_utils.dump_yaml(cluster_config_file, - original_config_content) return config_dict except provision_common.StopFailoverError: with ux_utils.print_exception_no_traceback(): diff --git a/sky/provision/common.py b/sky/provision/common.py index f50f3a31822..1d58174090e 100644 --- a/sky/provision/common.py +++ b/sky/provision/common.py @@ -78,7 +78,6 @@ class ProvisionRecord: resumed_instance_ids: List[InstanceId] # The IDs of all just created instances. created_instance_ids: List[InstanceId] - ephemeral_resources: List[Any] = dataclasses.field(default_factory=list) def is_instance_just_booted(self, instance_id: InstanceId) -> bool: """Whether or not the instance is just booted. diff --git a/sky/provision/runpod/instance.py b/sky/provision/runpod/instance.py index 23a97efd305..d81ed95d97b 100644 --- a/sky/provision/runpod/instance.py +++ b/sky/provision/runpod/instance.py @@ -79,11 +79,10 @@ def run_instances(region: str, cluster_name_on_cloud: str, created_instance_ids=[]) created_instance_ids = [] - ephemeral_resources = [] for _ in range(to_start_count): node_type = 'head' if head_instance_id is None else 'worker' try: - instance_id, ers = utils.launch( + instance_id = utils.launch( cluster_name=cluster_name_on_cloud, node_type=node_type, instance_type=config.node_config['InstanceType'], @@ -97,9 +96,6 @@ def run_instances(region: str, cluster_name_on_cloud: str, docker_login_config=config.provider_config.get( 'docker_login_config'), ) - for er in ers: - if er is not None: - ephemeral_resources.append(er) except Exception as e: # pylint: disable=broad-except logger.warning(f'run_instances error: {e}') raise @@ -128,8 +124,7 @@ def run_instances(region: str, cluster_name_on_cloud: str, zone=None, head_instance_id=head_instance_id, resumed_instance_ids=[], - created_instance_ids=created_instance_ids, - ephemeral_resources=ephemeral_resources) + created_instance_ids=created_instance_ids) def wait_instances(region: str, cluster_name_on_cloud: str, @@ -152,8 +147,9 @@ def terminate_instances( ) -> None: """See sky/provision/__init__.py""" assert provider_config is not None, (cluster_name_on_cloud, provider_config) - ephemeral_resources = provider_config.get('ephemeral_resources', []) instances = _filter_instances(cluster_name_on_cloud, None) + template_name, registry_auth_id = utils.get_registry_auth_resources( + cluster_name_on_cloud) for inst_id, inst in instances.items(): logger.debug(f'Terminating instance {inst_id}: {inst}') if worker_only and inst['name'].endswith('-head'): @@ -166,11 +162,9 @@ def terminate_instances( f'Failed to terminate instance {inst_id}: ' f'{common_utils.format_exception(e, use_bracket=False)}' ) from e - if ephemeral_resources: - # See sky/provision/runpod/utils.py::launch for details - assert len(ephemeral_resources) == 2, ephemeral_resources - template_name, registry_auth_id = ephemeral_resources + if template_name is not None: utils.delete_pod_template(template_name) + if registry_auth_id is not None: utils.delete_register_auth(registry_auth_id) diff --git a/sky/provision/runpod/utils.py b/sky/provision/runpod/utils.py index f9656720a6f..e85563a6825 100644 --- a/sky/provision/runpod/utils.py +++ b/sky/provision/runpod/utils.py @@ -13,6 +13,50 @@ logger = sky_logging.init_logger(__name__) +# Adapted from runpod.api.queries.pods.py::QUERY_POD. +# Adding container registry auth id to the query. +_QUERY_POD = """ +query myPods { + myself { + pods { + id + containerDiskInGb + containerRegistryAuthId + costPerHr + desiredStatus + dockerArgs + dockerId + env + gpuCount + imageName + lastStatusChange + machineId + memoryInGb + name + podType + port + ports + uptimeSeconds + vcpuCount + volumeInGb + volumeMountPath + runtime { + ports{ + ip + isIpPublic + privatePort + publicPort + type + } + } + machine { + gpuDisplayName + } + } + } +} +""" + GPU_NAME_MAP = { 'A100-80GB': 'NVIDIA A100 80GB PCIe', 'A100-40GB': 'NVIDIA A100-PCIE-40GB', @@ -48,6 +92,11 @@ } +def _construct_docker_login_template_name(cluster_name: str) -> str: + """Constructs the registry auth template name.""" + return f'{cluster_name}-docker-login-template' + + def retry(func): """Decorator to retry a function.""" @@ -67,9 +116,38 @@ def wrapper(*args, **kwargs): return wrapper +def _sky_get_pods() -> dict: + """List all pods with extra registry auth information. + + Adapted from runpod.get_pods() to include containerRegistryAuthId. + """ + raw_return = runpod.runpod.api.graphql.run_graphql_query(_QUERY_POD) + cleaned_return = raw_return['data']['myself']['pods'] + return cleaned_return + + +_QUERY_POD_TEMPLATE_WITH_REGISTRY_AUTH = """ +query myself { + myself { + podTemplates { + name + containerRegistryAuthId + } + } +} +""" + + +def _list_pod_templates() -> dict: + """List all pod templates.""" + raw_return = runpod.runpod.api.graphql.run_graphql_query( + _QUERY_POD_TEMPLATE_WITH_REGISTRY_AUTH) + return raw_return['data']['myself']['podTemplates'] + + def list_instances() -> Dict[str, Dict[str, Any]]: """Lists instances associated with API key.""" - instances = runpod.runpod.get_pods() + instances = _sky_get_pods() instance_dict: Dict[str, Dict[str, Any]] = {} for instance in instances: @@ -124,21 +202,20 @@ def _create_template_for_docker_login( cluster_name: str, image_name: str, docker_login_config: Optional[Dict[str, str]], -) -> Tuple[str, Optional[str], Optional[str], Optional[str]]: +) -> Tuple[str, Optional[str]]: """Creates a template for the given image with the docker login config. Returns: formatted_image_name: The formatted image name. # following fields are None for no docker login config. template_id: The template ID. - template_name: The template name. - registry_auth_id: The registry auth ID. """ if docker_login_config is None: - return image_name, None, None, None + return image_name, None login_config = docker_utils.DockerLoginConfig(**docker_login_config) container_registry_auth_name = f'{cluster_name}-registry-auth' - container_template_name = f'{cluster_name}-docker-login-template' + container_template_name = _construct_docker_login_template_name( + cluster_name) # The `name` argument is only for display purpose and the registry server # will be splitted from the docker image name (Tested with AWS ECR). # Here we only need the username and password to create the registry auth. @@ -158,15 +235,13 @@ def _create_template_for_docker_login( image_name=None, registry_auth_id=registry_auth_id, ) - return (login_config.format_image(image_name), create_template_resp['id'], - container_template_name, registry_auth_id) + return login_config.format_image(image_name), create_template_resp['id'] -def launch( - cluster_name: str, node_type: str, instance_type: str, region: str, - disk_size: int, image_name: str, ports: Optional[List[int]], - public_key: str, preemptible: Optional[bool], bid_per_gpu: float, - docker_login_config: Optional[Dict[str, str]]) -> Tuple[str, List[Any]]: +def launch(cluster_name: str, node_type: str, instance_type: str, region: str, + disk_size: int, image_name: str, ports: Optional[List[int]], + public_key: str, preemptible: Optional[bool], bid_per_gpu: float, + docker_login_config: Optional[Dict[str, str]]) -> str: """Launches an instance with the given parameters. Converts the instance_type to the RunPod GPU name, finds the specs for the @@ -174,7 +249,6 @@ def launch( Returns: instance_id: The instance ID. - ephemeral_resources: A list of ephemeral resources. """ name = f'{cluster_name}-{node_type}' gpu_type = GPU_NAME_MAP[instance_type.split('_')[1]] @@ -220,9 +294,8 @@ def launch( f'{constants.SKY_REMOTE_RAY_DASHBOARD_PORT}/http,' f'{constants.SKY_REMOTE_RAY_PORT}/http') - image_name_formatted, template_id, template_name, registry_auth_id = ( - _create_template_for_docker_login(cluster_name, image_name, - docker_login_config)) + image_name_formatted, template_id = (_create_template_for_docker_login( + cluster_name, image_name, docker_login_config)) params = { 'name': name, @@ -248,7 +321,25 @@ def launch( **params, ) - return new_instance['id'], [template_name, registry_auth_id] + return new_instance['id'] + + +def get_registry_auth_resources( + cluster_name: str) -> Tuple[Optional[str], Optional[str]]: + """Gets the registry auth resources.""" + container_registry_auth_name = _construct_docker_login_template_name( + cluster_name) + for template in _list_pod_templates(): + if template['name'] == container_registry_auth_name: + return container_registry_auth_name, template[ + 'containerRegistryAuthId'] + return None, None + + +def cleanup_registry_auth(template_name: str, registry_auth_id: str) -> None: + """Cleans up the registry auth.""" + delete_pod_template(template_name) + delete_register_auth(registry_auth_id) def remove(instance_id: str) -> None: From 895b340ab84cb8c73d06e0d60fe932ab440d3830 Mon Sep 17 00:00:00 2001 From: cblmemo Date: Mon, 6 Jan 2025 18:50:09 -0800 Subject: [PATCH 7/7] nits --- sky/provision/runpod/instance.py | 2 +- sky/provision/runpod/utils.py | 114 ++++++++++++++----------------- 2 files changed, 54 insertions(+), 62 deletions(-) diff --git a/sky/provision/runpod/instance.py b/sky/provision/runpod/instance.py index d81ed95d97b..9e57887c3f1 100644 --- a/sky/provision/runpod/instance.py +++ b/sky/provision/runpod/instance.py @@ -146,7 +146,7 @@ def terminate_instances( worker_only: bool = False, ) -> None: """See sky/provision/__init__.py""" - assert provider_config is not None, (cluster_name_on_cloud, provider_config) + del provider_config # unused instances = _filter_instances(cluster_name_on_cloud, None) template_name, registry_auth_id = utils.get_registry_auth_resources( cluster_name_on_cloud) diff --git a/sky/provision/runpod/utils.py b/sky/provision/runpod/utils.py index e85563a6825..6600cfd6198 100644 --- a/sky/provision/runpod/utils.py +++ b/sky/provision/runpod/utils.py @@ -13,50 +13,6 @@ logger = sky_logging.init_logger(__name__) -# Adapted from runpod.api.queries.pods.py::QUERY_POD. -# Adding container registry auth id to the query. -_QUERY_POD = """ -query myPods { - myself { - pods { - id - containerDiskInGb - containerRegistryAuthId - costPerHr - desiredStatus - dockerArgs - dockerId - env - gpuCount - imageName - lastStatusChange - machineId - memoryInGb - name - podType - port - ports - uptimeSeconds - vcpuCount - volumeInGb - volumeMountPath - runtime { - ports{ - ip - isIpPublic - privatePort - publicPort - type - } - } - machine { - gpuDisplayName - } - } - } -} -""" - GPU_NAME_MAP = { 'A100-80GB': 'NVIDIA A100 80GB PCIe', 'A100-40GB': 'NVIDIA A100-PCIE-40GB', @@ -116,6 +72,51 @@ def wrapper(*args, **kwargs): return wrapper +# Adapted from runpod.api.queries.pods.py::QUERY_POD. +# Adding containerRegistryAuthId to the query. +_QUERY_POD = """ +query myPods { + myself { + pods { + id + containerDiskInGb + containerRegistryAuthId + costPerHr + desiredStatus + dockerArgs + dockerId + env + gpuCount + imageName + lastStatusChange + machineId + memoryInGb + name + podType + port + ports + uptimeSeconds + vcpuCount + volumeInGb + volumeMountPath + runtime { + ports{ + ip + isIpPublic + privatePort + publicPort + type + } + } + machine { + gpuDisplayName + } + } + } +} +""" + + def _sky_get_pods() -> dict: """List all pods with extra registry auth information. @@ -138,7 +139,7 @@ def _sky_get_pods() -> dict: """ -def _list_pod_templates() -> dict: +def _list_pod_templates_with_container_registry() -> dict: """List all pod templates.""" raw_return = runpod.runpod.api.graphql.run_graphql_query( _QUERY_POD_TEMPLATE_WITH_REGISTRY_AUTH) @@ -207,8 +208,7 @@ def _create_template_for_docker_login( Returns: formatted_image_name: The formatted image name. - # following fields are None for no docker login config. - template_id: The template ID. + template_id: The template ID. None for no docker login config. """ if docker_login_config is None: return image_name, None @@ -219,11 +219,9 @@ def _create_template_for_docker_login( # The `name` argument is only for display purpose and the registry server # will be splitted from the docker image name (Tested with AWS ECR). # Here we only need the username and password to create the registry auth. - # TODO(tian): RunPod python API does not provide a way to get the registry - # auth and template ID by the name, and the only way to get the ID is when - # we create it. So we use a separate auth and template per cluster. This - # also assumes that every cluster has only one node, so no extra worker - # nodes will reuse the same auth and template name. + # TODO(tian): Now we create a template and a registry auth for each cluster. + # Consider create one for each server and reuse them. Challenges including + # calculate the reference count and delete them when no longer needed. create_auth_resp = runpod.runpod.create_container_registry_auth( name=container_registry_auth_name, username=login_config.username, @@ -294,8 +292,8 @@ def launch(cluster_name: str, node_type: str, instance_type: str, region: str, f'{constants.SKY_REMOTE_RAY_DASHBOARD_PORT}/http,' f'{constants.SKY_REMOTE_RAY_PORT}/http') - image_name_formatted, template_id = (_create_template_for_docker_login( - cluster_name, image_name, docker_login_config)) + image_name_formatted, template_id = _create_template_for_docker_login( + cluster_name, image_name, docker_login_config) params = { 'name': name, @@ -329,19 +327,13 @@ def get_registry_auth_resources( """Gets the registry auth resources.""" container_registry_auth_name = _construct_docker_login_template_name( cluster_name) - for template in _list_pod_templates(): + for template in _list_pod_templates_with_container_registry(): if template['name'] == container_registry_auth_name: return container_registry_auth_name, template[ 'containerRegistryAuthId'] return None, None -def cleanup_registry_auth(template_name: str, registry_auth_id: str) -> None: - """Cleans up the registry auth.""" - delete_pod_template(template_name) - delete_register_auth(registry_auth_id) - - def remove(instance_id: str) -> None: """Terminates the given instance.""" runpod.runpod.terminate_pod(instance_id)