From 023381b3fb04f05afa2a7a5e96f1d0464675d4ae Mon Sep 17 00:00:00 2001 From: Cohen-J-Omer Date: Thu, 23 Mar 2023 16:14:00 +0200 Subject: [PATCH] delete recyclable ips CloudFunctions&vpc_provider --- sky/skylet/providers/ibm/node_provider.py | 3 +- sky/skylet/providers/ibm/utils.py | 2 +- sky/skylet/providers/ibm/vpc_provider.py | 97 ++++++++++++++++------- 3 files changed, 69 insertions(+), 33 deletions(-) diff --git a/sky/skylet/providers/ibm/node_provider.py b/sky/skylet/providers/ibm/node_provider.py index 10bee0e0599c..babffb23077d 100644 --- a/sky/skylet/providers/ibm/node_provider.py +++ b/sky/skylet/providers/ibm/node_provider.py @@ -48,7 +48,7 @@ TAG_RAY_FILE_MOUNTS_CONTENTS, ) from sky.skylet.providers.ibm.vpc_provider import IBMVPCProvider -from sky.skylet.providers.ibm.utils import get_logger +from sky.skylet.providers.ibm.utils import get_logger, RAY_RECYCLABLE logger = get_logger("node_provider_") @@ -58,7 +58,6 @@ VOLUME_TIER_NAME_DEFAULT = "general-purpose" # identifies resources created by this package. # these resources are deleted alongside the node. -RAY_RECYCLABLE = "ray-recyclable" VPC_TAGS = ".sky-vpc-tags" diff --git a/sky/skylet/providers/ibm/utils.py b/sky/skylet/providers/ibm/utils.py index f68137949486..11dddf3bfc77 100644 --- a/sky/skylet/providers/ibm/utils.py +++ b/sky/skylet/providers/ibm/utils.py @@ -3,7 +3,7 @@ import logging import os import time - +RAY_RECYCLABLE = "ray-recyclable" def get_logger(caller_name): """ diff --git a/sky/skylet/providers/ibm/vpc_provider.py b/sky/skylet/providers/ibm/vpc_provider.py index 697fb204a567..027c94401c2b 100644 --- a/sky/skylet/providers/ibm/vpc_provider.py +++ b/sky/skylet/providers/ibm/vpc_provider.py @@ -12,7 +12,7 @@ import json import textwrap from sky.adaptors import ibm -from sky.skylet.providers.ibm.utils import get_logger +from sky.skylet.providers.ibm.utils import get_logger, RAY_RECYCLABLE from ibm_cloud_sdk_core import ApiException # pylint: disable=line-too-long @@ -294,16 +294,30 @@ def _poll_vpc_contains_vms(vpc_id): "continue to delete VPC." ) - # Delete VSIs if exist + def _del_instance(vm_data): + # first delete ips created by node_provider + nic_id = vm_data["network_interfaces"][0]["id"] + res = vpc_client.list_instance_network_interface_floating_ips( + vm_data["id"], nic_id + ).get_result() + floating_ips = res.get("floating_ips", []) + for ip in floating_ips: + if ip["name"].startswith(RAY_RECYCLABLE): + logger.info(f"Deleting IP: {ip['id']}") + vpc_client.delete_floating_ip(ip["id"]) + logger.info(f"Deleting VM: {vm_data['id']}") + vpc_client.delete_instance(id=vm_data["id"]) + # pylint: disable=line-too-long E1136 res = vpc_client.list_instances(vpc_id=vpc_id).get_result() num_instances = res["total_count"] + + # Delete VSIs if exist if num_instances: - vsi_ids = [vsi_id["id"] for vsi_id in res["instances"]] - logger.info(f"Deleting VMs: {vsi_ids}") + instances = res["instances"] with ThreadPoolExecutor(num_instances) as ex: for i in range(num_instances): - ex.submit(vpc_client.delete_instance, vsi_ids[i]) + ex.submit(_del_instance, instances[i]) # wait until all vms are deleted to proceed _poll_vpc_contains_vms(vpc_id) @@ -476,18 +490,18 @@ def __init__(self, resource_group_id, vpc_id, vpc_region) -> None: self.vpc_region = vpc_region function_code = textwrap.dedent( - """ + """ import subprocess - import os import time - + from concurrent.futures import ThreadPoolExecutor + RAY_RECYCLABLE = "ray-recyclable" ibm_vpc_client = None # modules installed and imported entry point ibm_vpc = None ibm_cloud_sdk_core = None - + def get_vpc_data(vpc_id): - + if not vpc_id: return None try: vpc_data = ibm_vpc_client.get_vpc(vpc_id).result @@ -541,26 +555,49 @@ def delete_gateways(vpc_id): # will retry until cloud functions timeout. time.sleep(5) - def delete_instances(vpc_id): - def _poll_instance_exists(instance_id): - tries = 20 + def delete_vms(vpc_id): + def _poll_vpc_contains_vms(vpc_id): + tries = 60 sleep_interval = 3 while tries: - try: - instance_data = ibm_vpc_client.get_instance(instance_id).get_result() - except Exception: - print(f'Deleted VM instance with id: {instance_id}') + # list_instances() never raise an exception, check values instead + res = ibm_vpc_client.list_instances(vpc_id=vpc_id).get_result() + if not res["total_count"]: return True - tries -= 1 - time.sleep(sleep_interval) - print(f"Failed to delete instance within expected time frame of {(tries*sleep_interval)/60} minutes.") - return False + else: + tries -= 1 + time.sleep(sleep_interval) + raise Exception( + "Failed to delete VPC's instances within " + "the expected time frame. Cannot " + "continue to delete VPC." + ) - instances = ibm_vpc_client.list_instances(vpc_id=vpc_id).get_result()['instances'] - instances_ids = [instance['id'] for instance in instances] - for id in instances_ids: - ibm_vpc_client.delete_instance(id=id).get_result() - _poll_instance_exists(id) + def _del_instance(vm_data): + # first delete ips created by node_provider + nic_id = vm_data["network_interfaces"][0]["id"] + res = ibm_vpc_client.list_instance_network_interface_floating_ips( + vm_data["id"], nic_id + ).get_result() + floating_ips = res.get("floating_ips", []) + for ip in floating_ips: + if ip["name"].startswith(RAY_RECYCLABLE): + print(f"Deleting IP: {ip['id']}") + ibm_vpc_client.delete_floating_ip(ip["id"]) + print(f"Deleting VM: {vm_data['id']}") + ibm_vpc_client.delete_instance(id=vm_data["id"]) + + res = ibm_vpc_client.list_instances(vpc_id=vpc_id).get_result() + num_instances = res["total_count"] + + # Delete VSIs if exist + if num_instances: + instances = res["instances"] + with ThreadPoolExecutor(num_instances) as ex: + for i in range(num_instances): + ex.submit(_del_instance, instances[i]) + # wait until all vms are deleted to proceed + _poll_vpc_contains_vms(vpc_id) def delete_unbound_vpc(vpc_id): deleting_resource = True @@ -583,7 +620,7 @@ def delete_vpc(vpc_id): print((f"Failed to find a VPC with id={vpc_id}")) return print(f"Deleting vpc:{vpc_data['name']} with id:{vpc_id}") - delete_instances(vpc_data['id']) + delete_vms(vpc_id) delete_subnets(vpc_data) delete_gateways(vpc_id) delete_unbound_vpc(vpc_id) @@ -596,7 +633,7 @@ def install_package(package): pip_location_stdout = subprocess.run(['which', 'pip'], capture_output=True, text=True) pip_location = pip_location_stdout.stdout.strip() subprocess.call([pip_location, 'install', package], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) - + for package in ["ibm-vpc", "ibm-cloud-sdk-core"]: install_package(package) @@ -614,10 +651,10 @@ def install_package(package): raise Exception("VPC not found in any region") ibm_vpc_client.set_service_url(f'https://{region}.iaas.cloud.ibm.com/v1') - + delete_vpc(vpc_id=vpc_id) return {"Status": "Success"} - """ + """ ) def get_headers(self):