Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Launching existing cluster in the same zone to avoid leakage #1700

Merged
merged 18 commits into from
Feb 22, 2023
Merged
45 changes: 41 additions & 4 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@
_RAY_YAML_KEYS_TO_RESTORE_FOR_BACK_COMPATIBILITY = {
'cluster_name', 'provider', 'auth', 'node_config'
}
# For these keys, don't use the old yaml's version and instead use the new yaml's.
# - zone: The zone field of the old yaml may be '1a,1b,1c' (AWS) while the actual
# zone of the launched cluster is '1a'. If we restore, then on capacity errors
# it's possible to failover to 1b, which leaves a leaked instance in 1a. Here,
# we use the new yaml's zone field, which is guaranteed to be the existing zone
# '1a'.
_RAY_YAML_KEYS_TO_RESTORE_EXCEPTIONS = [
('provider', 'availability_zone'),
]


def is_ip(s: str) -> bool:
Expand Down Expand Up @@ -694,17 +703,22 @@ def _remove_multinode_config(


def _replace_yaml_dicts(new_yaml: str, old_yaml: str,
key_names: Set[str]) -> str:
"""Replaces 'new' with 'old' for all keys in key_names.
restore_key_names: Set[str],
restore_key_names_exceptions: List[List[str]]) -> str:
"""Replaces 'new' with 'old' for all keys in restore_key_names.

The replacement will be applied recursively and only for the blocks
with the key in key_names, and have the same ancestors in both 'new'
and 'old' YAML tree.

The restore_key_names_exceptions is a list of key names that should not
be restored, i.e. those keys will be reset to the value in 'new' YAML
tree after the replacement.
"""

def _restore_block(new_block: Dict[str, Any], old_block: Dict[str, Any]):
for key, value in new_block.items():
if key in key_names:
if key in restore_key_names:
if key in old_block:
new_block[key] = old_block[key]
else:
Expand All @@ -715,7 +729,29 @@ def _restore_block(new_block: Dict[str, Any], old_block: Dict[str, Any]):

new_config = yaml.safe_load(new_yaml)
old_config = yaml.safe_load(old_yaml)
excluded_results = {}
# Find all key values excluded from restore
for exclude_restore_key_name_list in restore_key_names_exceptions:
excluded_result = new_config
found_excluded_key = True
for key in exclude_restore_key_name_list:
if (not isinstance(excluded_result, dict) or
key not in excluded_result):
found_excluded_key = False
break
excluded_result = excluded_result[key]
if found_excluded_key:
excluded_results[exclude_restore_key_name_list] = excluded_result

# Restore from old config
_restore_block(new_config, old_config)

# Revert the changes for the excluded key values
for exclude_restore_key_name, value in excluded_results.items():
curr = new_config
for key in exclude_restore_key_name[:-1]:
curr = curr[key]
curr[exclude_restore_key_name[-1]] = value
return common_utils.dump_yaml_str(new_config)


Expand Down Expand Up @@ -885,7 +921,8 @@ def write_cluster_config(
new_yaml_content = f.read()
restored_yaml_content = _replace_yaml_dicts(
new_yaml_content, old_yaml_content,
_RAY_YAML_KEYS_TO_RESTORE_FOR_BACK_COMPATIBILITY)
_RAY_YAML_KEYS_TO_RESTORE_FOR_BACK_COMPATIBILITY,
_RAY_YAML_KEYS_TO_RESTORE_EXCEPTIONS)
with open(tmp_yaml_path, 'w') as f:
f.write(restored_yaml_content)

Expand Down
331 changes: 189 additions & 142 deletions sky/backends/cloud_vm_ray_backend.py

Large diffs are not rendered by default.

24 changes: 16 additions & 8 deletions sky/clouds/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,22 +144,32 @@ def regions_with_offering(cls, instance_type: Optional[str],
return regions

@classmethod
def region_zones_provision_loop(
def zones_provision_loop(
cls,
*,
region: str,
num_nodes: int,
instance_type: Optional[str] = None,
accelerators: Optional[Dict[str, int]] = None,
use_spot: bool = False,
) -> Iterator[Tuple[clouds.Region, List[clouds.Zone]]]:
) -> Iterator[List[clouds.Zone]]:
# AWS provisioner can handle batched requests, so yield all zones under
# each region.
regions = cls.regions_with_offering(instance_type,
accelerators,
use_spot,
region=None,
region=region,
zone=None)
for region in regions:
yield region, region.zones
for r in regions:
assert r.zones is not None, r
if num_nodes > 1:
# When num_nodes > 1, we need to try the zones one by
# one, to avoid the nodes of a same cluster being
# placed in different zones.
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
for z in r.zones:
yield [z]
else:
yield r.zones

@classmethod
def _get_default_ami(cls, region_name: str, instance_type: str) -> str:
Expand Down Expand Up @@ -323,9 +333,7 @@ def make_deploy_resources_variables(
'Set either both or neither for: region, zones.')
region = self._get_default_region()
zones = region.zones
else:
assert zones is not None, (
'Set either both or neither for: region, zones.')
assert zones is not None, (region, zones)

region_name = region.name
zone_names = [zone.name for zone in zones]
Expand Down
19 changes: 10 additions & 9 deletions sky/clouds/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def regions_with_offering(cls, instance_type: Optional[str],
use_spot: bool, region: Optional[str],
zone: Optional[str]) -> List[clouds.Region]:
del accelerators # unused
assert zone is None, 'Azure does not support zones'
if instance_type is None:
# Fall back to default regions
regions = cls.regions()
Expand All @@ -182,27 +183,27 @@ def regions_with_offering(cls, instance_type: Optional[str],

if region is not None:
regions = [r for r in regions if r.name == region]
if zone is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @WoosukKwon to double check - should be ok?

for r in regions:
r.set_zones([z for z in r.zones if z.name == zone])
regions = [r for r in regions if r.zones]
return regions

@classmethod
def region_zones_provision_loop(
def zones_provision_loop(
cls,
*,
region: str,
num_nodes: int,
instance_type: Optional[str] = None,
accelerators: Optional[Dict[str, int]] = None,
use_spot: bool = False,
) -> Iterator[Tuple[clouds.Region, List[clouds.Zone]]]:
) -> Iterator[None]:
del num_nodes # unused
regions = cls.regions_with_offering(instance_type,
accelerators,
use_spot,
region=None,
region=region,
zone=None)
for region in regions:
yield region, region.zones
for r in regions:
assert r.zones is None, r
yield r.zones

# TODO: factor the following three methods, as they are the same logic
# between Azure and AWS.
Expand Down
43 changes: 36 additions & 7 deletions sky/clouds/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class CloudImplementationFeatures(enum.Enum):
class Region(collections.namedtuple('Region', ['name'])):
"""A region."""
name: str
zones: List['Zone'] = []
zones: Optional[List['Zone']] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thought for the future: in a few cloud impl's, their regions() method explicitly hard code the regions. We could change those impls to reading from their respective catalogs to be more consistent. (Rationale is was confused for a bit where we define a region's zones. We do it both in that method & in catalog reader methods, IIUC.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are actually using the region/zones from the catalog and those hardcoded regions are just fallbacks. It should be fine to remove them. We can do it in another PR. : )


def set_zones(self, zones: List['Zone']):
self.zones = zones
Expand Down Expand Up @@ -126,30 +126,59 @@ def regions_with_offering(cls, instance_type: Optional[str],
raise NotImplementedError

@classmethod
def region_zones_provision_loop(
def zones_provision_loop(
cls,
*,
region: str,
num_nodes: int,
instance_type: Optional[str] = None,
accelerators: Optional[Dict[str, int]] = None,
use_spot: bool = False,
) -> Iterator[Tuple[Region, List[Zone]]]:
"""Loops over (region, zones) to retry for provisioning.
) -> Iterator[Optional[List[Zone]]]:
"""Loops over zones to retry for provisioning in a given region.
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved

Certain clouds' provisioners may handle batched requests, retrying for
itself a list of zones under a region. Others may need a specific zone
per provision request (in that case, yields (region, a one-element list
for each zone)).
per provision request (in that case, yields a one-element list for each
zone).
Optionally, caller can filter the yielded region/zones by specifying the
instance_type, accelerators, and use_spot.

Args:
region: The region to provision.
num_nodes: The number of nodes to provision.
instance_type: The instance type to provision.
accelerators: The accelerators to provision.
use_spot: Whether to use spot instances.

Yields:
A list of zones that offer the requested resources in the given
region, in the order of price.
(1) If there is no zone that offers the specified resources, nothing
is yielded. For example, Azure does not support zone, and
calling this method with non-existing instance_type in the given
region, will yield nothing, i.e. raise StopIteration.
```
for zone in Azure.zones_provision_loop(region=region,
instance_type='non-existing'):
# Will not reach here.
```
(2) If the cloud's provisioner does not support `Zone`s, `None` will
be yielded.
```
for zone in Azure.zones_provision_loop(region=region,
instance_type='existing-instance'):
assert zone is None
```
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
This means if something is yielded, either it's None (zones are not
supported and the region offers the resources) or it's a non-empty
list (zones are supported and they offer the resources).

Typical usage:

for region, zones in cloud.region_zones_provision_loop(
for zones in cloud.region_zones_provision_loop(
region,
num_nodes,
instance_type,
accelerators,
use_spot
Expand Down
23 changes: 14 additions & 9 deletions sky/clouds/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ def regions_with_offering(cls, instance_type: Optional[str],
for r2 in vm_regions:
if r1.name != r2.name:
continue
assert r1.zones is not None, r1
assert r2.zones is not None, r2
zones = []
for z1 in r1.zones:
for z2 in r2.zones:
Expand All @@ -195,27 +197,32 @@ def regions_with_offering(cls, instance_type: Optional[str],
regions = [r for r in regions if r.name == region]
if zone is not None:
for r in regions:
assert r.zones is not None, r
r.set_zones([z for z in r.zones if z.name == zone])
regions = [r for r in regions if r.zones]
return regions

@classmethod
def region_zones_provision_loop(
def zones_provision_loop(
cls,
*,
region: str,
num_nodes: int,
instance_type: Optional[str] = None,
accelerators: Optional[Dict[str, int]] = None,
use_spot: bool = False,
) -> Iterator[Tuple[clouds.Region, List[clouds.Zone]]]:
) -> Iterator[List[clouds.Zone]]:
del num_nodes # Unused.
regions = cls.regions_with_offering(instance_type,
accelerators,
use_spot,
region=None,
region=region,
zone=None)
# GCP provisioner currently takes 1 zone per request.
for region in regions:
for zone in region.zones:
yield (region, [zone])
for r in regions:
assert r.zones is not None, r
for zone in r.zones:
yield [zone]

@classmethod
def get_zone_shell_cmd(cls) -> Optional[str]:
Expand Down Expand Up @@ -317,9 +324,7 @@ def make_deploy_resources_variables(
'Set either both or neither for: region, zones.')
region = self._get_default_region()
zones = region.zones
else:
assert zones is not None, (
'Set either both or neither for: region, zones.')
assert zones is not None, (region, zones)

region_name = region.name
zone_name = zones[0].name
Expand Down
15 changes: 10 additions & 5 deletions sky/clouds/lambda_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def regions_with_offering(cls, instance_type: Optional[str],
accelerators: Optional[Dict[str, int]],
use_spot: bool, region: Optional[str],
zone: Optional[str]) -> List[clouds.Region]:
assert zone is None, 'Lambda does not support zones.'
del accelerators, zone # unused
if use_spot:
return []
Expand All @@ -88,20 +89,24 @@ def regions_with_offering(cls, instance_type: Optional[str],
return regions

@classmethod
def region_zones_provision_loop(
def zones_provision_loop(
cls,
*,
region: str,
num_nodes: int,
instance_type: Optional[str] = None,
accelerators: Optional[Dict[str, int]] = None,
use_spot: bool = False,
) -> Iterator[Tuple[clouds.Region, List[clouds.Zone]]]:
) -> Iterator[None]:
del num_nodes # unused
regions = cls.regions_with_offering(instance_type,
accelerators,
use_spot,
region=None,
region=region,
zone=None)
for region in regions:
yield region, region.zones
for r in regions:
assert r.zones is None, r
yield r.zones

def instance_type_to_hourly_cost(self,
instance_type: str,
Expand Down
14 changes: 9 additions & 5 deletions sky/clouds/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,24 @@ def regions_with_offering(cls, instance_type: Optional[str],
return cls.regions()

@classmethod
def region_zones_provision_loop(
def zones_provision_loop(
cls,
*,
region: str,
num_nodes: int,
instance_type: Optional[str] = None,
accelerators: Optional[Dict[str, int]] = None,
use_spot: bool = False,
) -> Iterator[Tuple[clouds.Region, List[clouds.Zone]]]:
) -> Iterator[None]:
del num_nodes # Unused.
regions = cls.regions_with_offering(instance_type,
accelerators,
use_spot=use_spot,
region=None,
region=region,
zone=None)
for region in regions:
yield region, region.zones
for r in regions:
assert r.zones is None, r
yield r.zones

#### Normal methods ####

Expand Down
Loading