Skip to content

Commit

Permalink
[Provisioner] Open ports on AWS & GCP (#2210)
Browse files Browse the repository at this point in the history
* finished AWS & GCP, test passed on GCP

* GCP now create rules with cluster-specified tags

* Creating new SGs on AWS and configuring user specified rules

* new SG name

* move to resources & allow tcp only

* disable request of newer ports in task.yaml

* lint

* delete firewall rules & aws SGs after teardown

* add smoke test & cluster name length limit

* format

* add doc

* add error when not using aws & gcp

* add failover

* remove redundant length checking

* temporary remove failover and wait for #2245

* move to new provisioner api

* add unsupported feature

* add ports to resources.get_required_cloud_features

* use CloudImplementationFeatures to validate port

* remove assert for interrupted launching process

* renaming provision package

* AWS: only create new SG when port is specified

* nit: variable name

* remove redundant error message since check is done in resources validation

* support port range

* refactor GCP implementation

* remove unused function

* rename gcp rule name to avoid dependency and name too long

* remove redundant tags

* remove redundant silent argument

* change cluster name hash to truncate + hash

* format

* remove create firewall wait function

* rename hash_cluster_name to truncate_and_hash_cluster_name

* fix error when launching TPU node

* disable TPU with ports for now

* format

* enable tpu nodes

* enable tpu vm

* raise error instead of assert

* Update sky/skylet/providers/gcp/config.py

Co-authored-by: Zhanghao Wu <[email protected]>

---------

Co-authored-by: Zhanghao Wu <[email protected]>
  • Loading branch information
cblmemo and Michaelvll authored Aug 1, 2023
1 parent 1add829 commit fa4dd65
Show file tree
Hide file tree
Showing 26 changed files with 427 additions and 35 deletions.
7 changes: 7 additions & 0 deletions docs/source/reference/yaml-spec.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ Available fields:
# high: 6000 IOPS; 340 MB/s; write 250 MB/s
disk_tier: 'medium'
# Ports to expose (optional).
# Currently only TCP protocol is supported.
# Could be an integer or a range.
ports:
- 8080
- 10022-10040
# Additional accelerator metadata (optional); only used for TPU node
# and TPU VM.
# Example usage:
Expand Down
31 changes: 31 additions & 0 deletions examples/http_server_with_custom_ports/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import http.server
import socketserver

PORT = 33828


class MyHttpRequestHandler(http.server.SimpleHTTPRequestHandler):

def do_GET(self):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
html = '''
<html>
<head>
<title>Test Page</title>
</head>
<body>
<h1>This is a demo HTML page.</h1>
</body>
</html>
'''
self.wfile.write(bytes(html, 'utf8'))
return


Handler = MyHttpRequestHandler

with socketserver.TCPServer(("", PORT), Handler) as httpd:
print("serving at port", PORT)
httpd.serve_forever()
7 changes: 7 additions & 0 deletions examples/http_server_with_custom_ports/task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
resources:
ports:
- 33828

workdir: ./examples/http_server_with_custom_ports

run: python3 server.py
10 changes: 8 additions & 2 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ def _restore_block(new_block: Dict[str, Any], old_block: Dict[str, Any]):
def write_cluster_config(
to_provision: 'resources.Resources',
num_nodes: int,
ports: Optional[List[Union[int, str]]],
cluster_config_template: str,
cluster_name: str,
local_wheel_path: pathlib.Path,
Expand Down Expand Up @@ -897,6 +898,11 @@ def write_cluster_config(
f'open(os.path.expanduser("{constants.SKY_REMOTE_RAY_PORT_FILE}"), "w"))\''
)

# Only using new security group names for clusters with ports specified.
default_aws_sg_name = f'sky-sg-{common_utils.user_and_hostname_hash()}'
if ports is not None:
default_aws_sg_name += f'-{common_utils.truncate_and_hash_cluster_name(cluster_name)}'

# Use a tmp file path to avoid incomplete YAML file being re-used in the
# future.
tmp_yaml_path = yaml_path + '.tmp'
Expand All @@ -907,6 +913,7 @@ def write_cluster_config(
**{
'cluster_name': cluster_name,
'num_nodes': num_nodes,
'ports': ports,
'disk_size': to_provision.disk_size,
# If the current code is run by controller, propagate the real
# calling user which should've been passed in as the
Expand All @@ -921,8 +928,7 @@ def write_cluster_config(
# (username, last 4 chars of hash of hostname): for uniquefying
# users on shared-account scenarios.
'security_group': skypilot_config.get_nested(
('aws', 'security_group_name'),
f'sky-sg-{common_utils.user_and_hostname_hash()}'),
('aws', 'security_group_name'), default_aws_sg_name),
'vpc_name': skypilot_config.get_nested(('aws', 'vpc_name'),
None),
'use_internal_ips': skypilot_config.get_nested(
Expand Down
8 changes: 8 additions & 0 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,7 @@ def _retry_zones(
config_dict = backend_utils.write_cluster_config(
to_provision,
num_nodes,
to_provision.ports,
_get_cluster_config_template(to_provision.cloud),
cluster_name,
self._local_wheel_path,
Expand Down Expand Up @@ -3647,6 +3648,13 @@ def post_teardown_cleanup(self,
f'Failed to delete cloned image {image_id}. Please '
'remove it manually to avoid image leakage. Details: '
f'{common_utils.format_exception(e, use_bracket=True)}')
if terminate:
cloud = handle.launched_resources.cloud
config = common_utils.read_yaml(handle.cluster_yaml)
if isinstance(cloud, (clouds.AWS, clouds.GCP)):
# Clean up AWS SGs
provision_lib.cleanup_ports(repr(cloud), handle.cluster_name,
config['provider'])

# The cluster file must exist because the cluster_yaml will only
# be removed after the cluster entry in the database is removed.
Expand Down
1 change: 1 addition & 0 deletions sky/clouds/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def _cloud_unsupported_features(
# TODO(zhwu): our azure subscription offer ID does not support spot.
# Need to support it.
clouds.CloudImplementationFeatures.SPOT_INSTANCE: f'Spot instances are not supported in {cls._REPR}.',
clouds.CloudImplementationFeatures.OPEN_PORTS: f'Opening ports is not supported in {cls._REPR}.',
}

@classmethod
Expand Down
1 change: 1 addition & 0 deletions sky/clouds/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class CloudImplementationFeatures(enum.Enum):
CLONE_DISK_FROM_CLUSTER = 'clone_disk_from_cluster'
SPOT_INSTANCE = 'spot_instance'
CUSTOM_DISK_TIER = 'custom_disk_tier'
OPEN_PORTS = 'open_ports'


class Region(collections.namedtuple('Region', ['name'])):
Expand Down
2 changes: 2 additions & 0 deletions sky/clouds/ibm.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def _cloud_unsupported_features(
(f'Migrating disk is not supported in {cls._REPR}.'),
clouds.CloudImplementationFeatures.CUSTOM_DISK_TIER:
(f'Custom disk tier is not supported in {cls._REPR}.'),
clouds.CloudImplementationFeatures.OPEN_PORTS:
(f'Opening ports is not supported in {cls._REPR}.'),
}

@classmethod
Expand Down
1 change: 1 addition & 0 deletions sky/clouds/lambda_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class Lambda(clouds.Cloud):
clouds.CloudImplementationFeatures.CLONE_DISK_FROM_CLUSTER: f'Migrating disk is not supported in {_REPR}.',
clouds.CloudImplementationFeatures.SPOT_INSTANCE: f'Spot instances are not supported in {_REPR}.',
clouds.CloudImplementationFeatures.CUSTOM_DISK_TIER: f'Custom disk tiers are not supported in {_REPR}.',
clouds.CloudImplementationFeatures.OPEN_PORTS: f'Opening ports is not supported in {_REPR}.',
}

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions sky/clouds/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class Local(clouds.Cloud):
('Local cloud does not support stopping instances.'),
clouds.CloudImplementationFeatures.CLONE_DISK_FROM_CLUSTER:
('Migrating disk is not supported for Local.'),
clouds.CloudImplementationFeatures.OPEN_PORTS:
('Opening ports is not supported for Local.'),
}

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions sky/clouds/oci.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def _cloud_unsupported_features(
return {
clouds.CloudImplementationFeatures.CLONE_DISK_FROM_CLUSTER:
(f'Migrating disk is not supported in {cls._REPR}.'),
clouds.CloudImplementationFeatures.OPEN_PORTS:
(f'Opening ports is not supported in {cls._REPR}.'),
}

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions sky/clouds/scp.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class SCP(clouds.Cloud):
(f'Spot instances are not supported in {_REPR}.'),
clouds.CloudImplementationFeatures.CUSTOM_DISK_TIER:
(f'Custom disk tiers are not supported in {_REPR}.'),
clouds.CloudImplementationFeatures.OPEN_PORTS:
(f'Opening ports is not supported in {_REPR}.'),
}

_INDENT_PREFIX = ' '
Expand Down
10 changes: 10 additions & 0 deletions sky/provision/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,13 @@ def terminate_instances(
) -> None:
"""Terminate running or stopped instances."""
raise NotImplementedError


@_route_to_cloud_impl
def cleanup_ports(
provider_name: str,
cluster_name: str,
provider_config: Optional[Dict[str, Any]] = None,
) -> None:
"""Delete any opened ports."""
raise NotImplementedError
4 changes: 2 additions & 2 deletions sky/provision/aws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""AWS provisioner for SkyPilot."""

from sky.provision.aws.instance import (query_instances, terminate_instances,
stop_instances)
from sky.provision.aws.instance import (cleanup_ports, query_instances,
terminate_instances, stop_instances)
32 changes: 32 additions & 0 deletions sky/provision/aws/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from botocore import config

from sky.adaptors import aws
from sky.utils import common_utils
from sky import status_lib

BOTO_MAX_RETRIES = 12
Expand Down Expand Up @@ -134,9 +135,40 @@ def terminate_instances(
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2.html#EC2.Instance
instances = _filter_instances(ec2, filters, None, None)
instances.terminate()
if 'ports' not in provider_config:
return
# If ports are specified, we need to delete the newly created Security
# Group. Here we wait for all instances to be terminated, since the
# Security Group dependent on them.
for instance in instances:
instance.wait_until_terminated()
# TODO(suquark): Currently, the implementation of GCP and Azure will
# wait util the cluster is fully terminated, while other clouds just
# trigger the termination process (via http call) and then return.
# It's not clear that which behavior should be expected. We will not
# wait for the termination for now, since this is the default behavior
# of most cloud implementations (including AWS).


def cleanup_ports(
cluster_name: str,
provider_config: Optional[Dict[str, Any]] = None,
) -> None:
"""See sky/provision/__init__.py"""
assert provider_config is not None, (cluster_name, provider_config)
if 'ports' not in provider_config:
return
region = provider_config['region']
ec2 = aws.resource(
'ec2',
region_name=region,
config=config.Config(retries={'max_attempts': BOTO_MAX_RETRIES}))
# TODO(tian): Add a function to generate SG name for AWS, then replace here
# and backend_utils::write_cluster_config
sg_name = (f'sky-sg-{common_utils.user_and_hostname_hash()}'
f'-{common_utils.truncate_and_hash_cluster_name(cluster_name)}')
sgs = ec2.security_groups.filter(GroupNames=[sg_name])
if len(list(sgs)) != 1:
raise ValueError(f'Expected security group {sg_name} not found. '
'Cannot cleanup ports.')
list(sgs)[0].delete()
2 changes: 1 addition & 1 deletion sky/provision/gcp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""GCP provisioner for SkyPilot."""

from sky.provision.gcp.instance import stop_instances, terminate_instances
from sky.provision.gcp.instance import stop_instances, terminate_instances, cleanup_ports
19 changes: 19 additions & 0 deletions sky/provision/gcp/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from sky import sky_logging
from sky.provision.gcp import instance_utils
from sky.utils import common_utils

logger = sky_logging.init_logger(__name__)

Expand Down Expand Up @@ -151,3 +152,21 @@ def terminate_instances(
_wait_for_operations(operations, project_id, zone)
# We don't wait for the instances to be terminated, as it can take a long
# time (same as what we did in ray's node_provider).


def cleanup_ports(
cluster_name: str,
provider_config: Optional[Dict[str, Any]] = None,
) -> None:
"""See sky/provision/__init__.py"""
assert provider_config is not None, cluster_name
if 'ports' not in provider_config:
# No new ports were opened, so there is nothing to clean up.
return
project_id = provider_config['project_id']
cluster_name_hash = common_utils.truncate_and_hash_cluster_name(
cluster_name)
for port in provider_config['ports']:
rule_name = f'user-ports-{cluster_name_hash}-{port}'
instance_utils.GCPComputeInstance.delete_firewall_rule(
project_id, rule_name)
25 changes: 25 additions & 0 deletions sky/provision/gcp/instance_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ def filter(
) -> List[str]:
raise NotImplementedError

@classmethod
def delete_firewall_rule(
cls,
project_id: str,
firewall_rule_name: str,
) -> None:
raise NotImplementedError


class GCPComputeInstance(GCPInstance):
"""Instance handler for GCP compute instances."""
Expand Down Expand Up @@ -182,6 +190,23 @@ def wait_for_operation(cls, operation: dict, project_id: str,
return True
return False

@classmethod
def delete_firewall_rule(
cls,
project_id: str,
firewall_rule_name: str,
) -> None:
rule = cls.load_resource().firewalls().list(
project=project_id, filter=f'name={firewall_rule_name}')
if not rule:
logger.warning(f'Firewall rule {firewall_rule_name} not found. '
'Skip cleanup.')
return
cls.load_resource().firewalls().delete(
project=project_id,
firewall=firewall_rule_name,
).execute()


class GCPTPUVMInstance(GCPInstance):
"""Instance handler for GCP TPU node."""
Expand Down
Loading

0 comments on commit fa4dd65

Please sign in to comment.