Skip to content

Commit

Permalink
delete recyclable ips CloudFunctions&vpc_provider
Browse files Browse the repository at this point in the history
  • Loading branch information
Cohen-J-Omer committed Mar 23, 2023
1 parent 4a01a59 commit 023381b
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 33 deletions.
3 changes: 1 addition & 2 deletions sky/skylet/providers/ibm/node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
TAG_RAY_FILE_MOUNTS_CONTENTS,
)
from sky.skylet.providers.ibm.vpc_provider import IBMVPCProvider
from sky.skylet.providers.ibm.utils import get_logger
from sky.skylet.providers.ibm.utils import get_logger, RAY_RECYCLABLE

logger = get_logger("node_provider_")

Expand All @@ -58,7 +58,6 @@
VOLUME_TIER_NAME_DEFAULT = "general-purpose"
# identifies resources created by this package.
# these resources are deleted alongside the node.
RAY_RECYCLABLE = "ray-recyclable"
VPC_TAGS = ".sky-vpc-tags"


Expand Down
2 changes: 1 addition & 1 deletion sky/skylet/providers/ibm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import os
import time

RAY_RECYCLABLE = "ray-recyclable"

def get_logger(caller_name):
"""
Expand Down
97 changes: 67 additions & 30 deletions sky/skylet/providers/ibm/vpc_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import json
import textwrap
from sky.adaptors import ibm
from sky.skylet.providers.ibm.utils import get_logger
from sky.skylet.providers.ibm.utils import get_logger, RAY_RECYCLABLE
from ibm_cloud_sdk_core import ApiException

# pylint: disable=line-too-long
Expand Down Expand Up @@ -294,16 +294,30 @@ def _poll_vpc_contains_vms(vpc_id):
"continue to delete VPC."
)

# Delete VSIs if exist
def _del_instance(vm_data):
# first delete ips created by node_provider
nic_id = vm_data["network_interfaces"][0]["id"]
res = vpc_client.list_instance_network_interface_floating_ips(
vm_data["id"], nic_id
).get_result()
floating_ips = res.get("floating_ips", [])
for ip in floating_ips:
if ip["name"].startswith(RAY_RECYCLABLE):
logger.info(f"Deleting IP: {ip['id']}")
vpc_client.delete_floating_ip(ip["id"])
logger.info(f"Deleting VM: {vm_data['id']}")
vpc_client.delete_instance(id=vm_data["id"])

# pylint: disable=line-too-long E1136
res = vpc_client.list_instances(vpc_id=vpc_id).get_result()
num_instances = res["total_count"]

# Delete VSIs if exist
if num_instances:
vsi_ids = [vsi_id["id"] for vsi_id in res["instances"]]
logger.info(f"Deleting VMs: {vsi_ids}")
instances = res["instances"]
with ThreadPoolExecutor(num_instances) as ex:
for i in range(num_instances):
ex.submit(vpc_client.delete_instance, vsi_ids[i])
ex.submit(_del_instance, instances[i])
# wait until all vms are deleted to proceed
_poll_vpc_contains_vms(vpc_id)

Expand Down Expand Up @@ -476,18 +490,18 @@ def __init__(self, resource_group_id, vpc_id, vpc_region) -> None:
self.vpc_region = vpc_region

function_code = textwrap.dedent(
"""
"""
import subprocess
import os
import time
from concurrent.futures import ThreadPoolExecutor
RAY_RECYCLABLE = "ray-recyclable"
ibm_vpc_client = None
# modules installed and imported entry point
ibm_vpc = None
ibm_cloud_sdk_core = None
def get_vpc_data(vpc_id):
if not vpc_id: return None
try:
vpc_data = ibm_vpc_client.get_vpc(vpc_id).result
Expand Down Expand Up @@ -541,26 +555,49 @@ def delete_gateways(vpc_id):
# will retry until cloud functions timeout.
time.sleep(5)
def delete_instances(vpc_id):
def _poll_instance_exists(instance_id):
tries = 20
def delete_vms(vpc_id):
def _poll_vpc_contains_vms(vpc_id):
tries = 60
sleep_interval = 3
while tries:
try:
instance_data = ibm_vpc_client.get_instance(instance_id).get_result()
except Exception:
print(f'Deleted VM instance with id: {instance_id}')
# list_instances() never raise an exception, check values instead
res = ibm_vpc_client.list_instances(vpc_id=vpc_id).get_result()
if not res["total_count"]:
return True
tries -= 1
time.sleep(sleep_interval)
print(f"Failed to delete instance within expected time frame of {(tries*sleep_interval)/60} minutes.")
return False
else:
tries -= 1
time.sleep(sleep_interval)
raise Exception(
"Failed to delete VPC's instances within "
"the expected time frame. Cannot "
"continue to delete VPC."
)
instances = ibm_vpc_client.list_instances(vpc_id=vpc_id).get_result()['instances']
instances_ids = [instance['id'] for instance in instances]
for id in instances_ids:
ibm_vpc_client.delete_instance(id=id).get_result()
_poll_instance_exists(id)
def _del_instance(vm_data):
# first delete ips created by node_provider
nic_id = vm_data["network_interfaces"][0]["id"]
res = ibm_vpc_client.list_instance_network_interface_floating_ips(
vm_data["id"], nic_id
).get_result()
floating_ips = res.get("floating_ips", [])
for ip in floating_ips:
if ip["name"].startswith(RAY_RECYCLABLE):
print(f"Deleting IP: {ip['id']}")
ibm_vpc_client.delete_floating_ip(ip["id"])
print(f"Deleting VM: {vm_data['id']}")
ibm_vpc_client.delete_instance(id=vm_data["id"])
res = ibm_vpc_client.list_instances(vpc_id=vpc_id).get_result()
num_instances = res["total_count"]
# Delete VSIs if exist
if num_instances:
instances = res["instances"]
with ThreadPoolExecutor(num_instances) as ex:
for i in range(num_instances):
ex.submit(_del_instance, instances[i])
# wait until all vms are deleted to proceed
_poll_vpc_contains_vms(vpc_id)
def delete_unbound_vpc(vpc_id):
deleting_resource = True
Expand All @@ -583,7 +620,7 @@ def delete_vpc(vpc_id):
print((f"Failed to find a VPC with id={vpc_id}"))
return
print(f"Deleting vpc:{vpc_data['name']} with id:{vpc_id}")
delete_instances(vpc_data['id'])
delete_vms(vpc_id)
delete_subnets(vpc_data)
delete_gateways(vpc_id)
delete_unbound_vpc(vpc_id)
Expand All @@ -596,7 +633,7 @@ def install_package(package):
pip_location_stdout = subprocess.run(['which', 'pip'], capture_output=True, text=True)
pip_location = pip_location_stdout.stdout.strip()
subprocess.call([pip_location, 'install', package], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
for package in ["ibm-vpc", "ibm-cloud-sdk-core"]:
install_package(package)
Expand All @@ -614,10 +651,10 @@ def install_package(package):
raise Exception("VPC not found in any region")
ibm_vpc_client.set_service_url(f'https://{region}.iaas.cloud.ibm.com/v1')
delete_vpc(vpc_id=vpc_id)
return {"Status": "Success"}
"""
"""
)

def get_headers(self):
Expand Down

0 comments on commit 023381b

Please sign in to comment.