Skip to content

Commit

Permalink
[Optimizer] Optimizing at the region and zone levels (#975)
Browse files Browse the repository at this point in the history
  • Loading branch information
WoosukKwon authored Dec 29, 2022
1 parent 29651f7 commit 63d057b
Show file tree
Hide file tree
Showing 14 changed files with 518 additions and 133 deletions.
70 changes: 52 additions & 18 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,12 @@ def _update_blocklist_on_gcp_error(self, region, zones, stdout, stderr):
# This skip is only correct if we implement "first
# retry the region/zone of an existing cluster with the
# same name" correctly.
for r, _ in clouds.GCP.region_zones_provision_loop():
for r in clouds.GCP.regions_with_offering(
instance_type=None,
accelerators=None,
use_spot=False,
region=None,
zone=None):
self._blocked_regions.add(r.name)
else:
# Per region. Ex: Quota 'CPUS' exceeded. Limit: 24.0
Expand Down Expand Up @@ -667,7 +672,6 @@ def _update_blocklist_on_gcp_error(self, region, zones, stdout, stderr):
'check logs above.')

def _update_blocklist_on_aws_error(self, region, zones, stdout, stderr):
del zones # Unused.
style = colorama.Style
stdout_splits = stdout.split('\n')
stderr_splits = stderr.split('\n')
Expand Down Expand Up @@ -698,9 +702,13 @@ def _update_blocklist_on_aws_error(self, region, zones, stdout, stderr):
with ux_utils.print_exception_no_traceback():
raise RuntimeError('Errors occurred during provision; '
'check logs above.')
# The underlying ray autoscaler / boto3 will try all zones of a region
# at once.
logger.warning(f'Got error(s) in all zones of {region.name}:')
if set(zones) == set(region.zones):
# The underlying ray autoscaler / boto3 will try all zones of a
# region at once.
logger.warning(f'Got error(s) in all zones of {region.name}:')
else:
zones_str = ', '.join(z.name for z in zones)
logger.warning(f'Got error(s) in {zones_str}:')
messages = '\n\t'.join(errors)
logger.warning(f'{style.DIM}\t{messages}{style.RESET_ALL}')
self._blocked_regions.add(region.name)
Expand Down Expand Up @@ -1022,8 +1030,11 @@ def _retry_region_zones(self,
cluster_exists):
if self._in_blocklist(to_provision.cloud, region, zones):
continue
zone_str = ','.join(
z.name for z in zones) if zones is not None else 'all zones'
if not zones:
# For Azure, zones is always an empty list.
zone_str = 'all zones'
else:
zone_str = ','.join(z.name for z in zones)
try:
config_dict = backend_utils.write_cluster_config(
to_provision,
Expand Down Expand Up @@ -1157,9 +1168,19 @@ def _retry_region_zones(self,
CloudVmRayBackend().teardown_no_lock(handle,
terminate=need_terminate)

message = ('Failed to acquire resources in all regions/zones of '
f'{to_provision.cloud}. '
'Try changing resource requirements or use another cloud.')
if to_provision.zone is not None:
message = (
f'Failed to acquire resources in {to_provision.zone}. '
'Try changing resource requirements or use another zone.')
elif to_provision.region is not None:
# For public clouds, provision.region is always set.
message = ('Failed to acquire resources in all zones in '
f'{to_provision.region}. Try changing resource '
'requirements or use another region.')
else:
message = (f'Failed to acquire resources in {to_provision.cloud}. '
'Try changing resource requirements or use another '
'cloud provider.')
# Do not failover to other clouds if the cluster was previously
# UP or STOPPED, since the user can have some data on the cluster.
raise exceptions.ResourcesUnavailableError(
Expand Down Expand Up @@ -1534,6 +1555,13 @@ def provision_with_retries(
logger.warning(f'\n{style.BRIGHT}Provision failed for {num_nodes}x '
f'{to_provision}. Trying other launchable resources '
f'(if any).{style.RESET_ALL}')
if to_provision.zone is None:
region_or_zone_str = str(to_provision.region)
else:
region_or_zone_str = str(to_provision.zone)
logger.warning(f'\n{style.BRIGHT}Provision failed for {num_nodes}x '
f'{to_provision} in {region_or_zone_str}. '
f'Trying other locations (if any).{style.RESET_ALL}')
if not cluster_exists:
# Add failed resources to the blocklist, only when it
# is in fallback mode.
Expand Down Expand Up @@ -1914,14 +1942,20 @@ def _provision(self,
backoff = common_utils.Backoff(_RETRY_UNTIL_UP_INIT_GAP_SECONDS)
attempt_cnt = 1
while True:
# RetryingVmProvisioner will retry within a cloud's regions
# first (if a region is not explicitly requested), then
# optionally retry on all other clouds (if
# backend.register_info() has been called). After this "round"
# of optimization across clouds, provisioning may still have
# not succeeded. This while loop will then kick in if
# retry_until_up is set, which will kick off new "rounds" of
# optimization infinitely.
# For on-demand instances, RetryingVmProvisioner will retry
# within the given region first, then optionally retry on all
# other clouds and regions (if backend.register_info()
# has been called).
# For spot instances, each provisioning request is made for a
# single zone and the provisioner will retry on all other
# clouds, regions, and zones.
# See optimizer.py#_make_launchables_for_valid_region_zones()
# for detailed reasons.

# After this "round" of optimization across clouds, provisioning
# may still have not succeeded. This while loop will then kick
# in if retry_until_up is set, which will kick off new "rounds"
# of optimization infinitely.
try:
provisioner = RetryingVmProvisioner(self.log_dir, self._dag,
self._optimize_target,
Expand Down
61 changes: 46 additions & 15 deletions sky/clouds/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,27 @@ def regions(cls):
]
return cls._regions

@classmethod
def regions_with_offering(cls, instance_type: Optional[str],
accelerators: Optional[Dict[str, int]],
use_spot: bool, region: Optional[str],
zone: Optional[str]) -> List[clouds.Region]:
del accelerators # unused
if instance_type is None:
# Fall back to default regions
regions = cls.regions()
else:
regions = service_catalog.get_region_zones_for_instance_type(
instance_type, use_spot, 'aws')

if region is not None:
regions = [r for r in regions if r.name == region]
if zone is not None:
for r in regions:
r.set_zones([z for z in r.zones if z.name == zone])
regions = [r for r in regions if r.zones]
return regions

@classmethod
def region_zones_provision_loop(
cls,
Expand All @@ -115,14 +136,11 @@ def region_zones_provision_loop(
) -> Iterator[Tuple[clouds.Region, List[clouds.Zone]]]:
# AWS provisioner can handle batched requests, so yield all zones under
# each region.
del accelerators # unused

if instance_type is None:
# fallback to manually specified region/zones
regions = cls.regions()
else:
regions = service_catalog.get_region_zones_for_instance_type(
instance_type, use_spot, 'aws')
regions = cls.regions_with_offering(instance_type,
accelerators,
use_spot,
region=None,
zone=None)
for region in regions:
yield region, region.zones

Expand Down Expand Up @@ -207,14 +225,23 @@ def get_zone_shell_cmd(cls) -> Optional[str]:

#### Normal methods ####

def instance_type_to_hourly_cost(self, instance_type: str, use_spot: bool):
def instance_type_to_hourly_cost(self,
instance_type: str,
use_spot: bool,
region: Optional[str] = None,
zone: Optional[str] = None) -> float:
return service_catalog.get_hourly_cost(instance_type,
region=None,
use_spot=use_spot,
region=region,
zone=zone,
clouds='aws')

def accelerators_to_hourly_cost(self, accelerators,
use_spot: bool) -> float:
def accelerators_to_hourly_cost(self,
accelerators: Dict[str, int],
use_spot: bool,
region: Optional[str] = None,
zone: Optional[str] = None) -> float:
del accelerators, use_spot, region, zone # unused
# AWS includes accelerators as part of the instance type. Implementing
# this is also necessary for e.g., the instance may have 4 GPUs, while
# the task specifies to use 1 GPU.
Expand Down Expand Up @@ -336,9 +363,13 @@ def _make(instance_list):
assert len(accelerators) == 1, resources
acc, acc_count = list(accelerators.items())[0]
(instance_list, fuzzy_candidate_list
) = service_catalog.get_instance_type_for_accelerator(acc,
acc_count,
clouds='aws')
) = service_catalog.get_instance_type_for_accelerator(
acc,
acc_count,
use_spot=resources.use_spot,
region=resources.region,
zone=resources.zone,
clouds='aws')
if instance_list is None:
return ([], fuzzy_candidate_list)
return (_make(instance_list), fuzzy_candidate_list)
Expand Down
60 changes: 46 additions & 14 deletions sky/clouds/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,23 @@ class Azure(clouds.Cloud):
_REPR = 'Azure'
_regions: List[clouds.Region] = []

def instance_type_to_hourly_cost(self, instance_type, use_spot):
def instance_type_to_hourly_cost(self,
instance_type: str,
use_spot: bool,
region: Optional[str] = None,
zone: Optional[str] = None) -> float:
return service_catalog.get_hourly_cost(instance_type,
region=None,
use_spot=use_spot,
region=region,
zone=zone,
clouds='azure')

def accelerators_to_hourly_cost(self, accelerators, use_spot):
def accelerators_to_hourly_cost(self,
accelerators: Dict[str, int],
use_spot: bool,
region: Optional[str] = None,
zone: Optional[str] = None) -> float:
del accelerators, use_spot, region, zone # unused
# Azure includes accelerators as part of the instance type.
# Implementing this is also necessary for e.g., the instance may have 4
# GPUs, while the task specifies to use 1 GPU.
Expand Down Expand Up @@ -140,6 +150,27 @@ def regions(cls) -> List[clouds.Region]:
]
return cls._regions

@classmethod
def regions_with_offering(cls, instance_type: Optional[str],
accelerators: Optional[Dict[str, int]],
use_spot: bool, region: Optional[str],
zone: Optional[str]) -> List[clouds.Region]:
del accelerators # unused
if instance_type is None:
# Fall back to default regions
regions = cls.regions()
else:
regions = service_catalog.get_region_zones_for_instance_type(
instance_type, use_spot, 'azure')

if region is not None:
regions = [r for r in regions if r.name == region]
if zone is not None:
for r in regions:
r.set_zones([z for z in r.zones if z.name == zone])
regions = [r for r in regions if r.zones]
return regions

@classmethod
def region_zones_provision_loop(
cls,
Expand All @@ -148,14 +179,11 @@ def region_zones_provision_loop(
accelerators: Optional[Dict[str, int]] = None,
use_spot: bool = False,
) -> Iterator[Tuple[clouds.Region, List[clouds.Zone]]]:
del accelerators # unused

if instance_type is None:
# fallback to manually specified region/zones
regions = cls.regions()
else:
regions = service_catalog.get_region_zones_for_instance_type(
instance_type, use_spot, clouds='azure')
regions = cls.regions_with_offering(instance_type,
accelerators,
use_spot,
region=None,
zone=None)
for region in regions:
yield region, region.zones

Expand Down Expand Up @@ -250,9 +278,13 @@ def _make(instance_list):
assert len(accelerators) == 1, resources
acc, acc_count = list(accelerators.items())[0]
(instance_list, fuzzy_candidate_list
) = service_catalog.get_instance_type_for_accelerator(acc,
acc_count,
clouds='azure')
) = service_catalog.get_instance_type_for_accelerator(
acc,
acc_count,
use_spot=resources.use_spot,
region=resources.region,
zone=resources.zone,
clouds='azure')
if instance_list is None:
return ([], fuzzy_candidate_list)
return (_make(instance_list), fuzzy_candidate_list)
Expand Down
30 changes: 27 additions & 3 deletions sky/clouds/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,27 @@ class Cloud:
def regions(cls) -> List[Region]:
raise NotImplementedError

@classmethod
def regions_with_offering(cls, instance_type: Optional[str],
accelerators: Optional[Dict[str, int]],
use_spot: bool, region: Optional[str],
zone: Optional[str]) -> List[Region]:
"""Returns the regions that offer the specified resources.
The order of the regions follow the order of the regions returned by
service_catalog/common.py#get_region_zones().
When region or zone is not None, the returned value will be limited to
the specified region/zone.
Returns:
A set of `Region`s that have the offerings for the specified
resources.
For each `Region` in the set, `region.zones` is the list of `Zone`s
which have the offerings. For the clouds that do not expose `Zone`s,
`region.zones` is an empty list.
"""
raise NotImplementedError

@classmethod
def region_zones_provision_loop(
cls,
Expand Down Expand Up @@ -103,12 +124,15 @@ def get_zone_shell_cmd(cls) -> Optional[str]:

#### Normal methods ####

# TODO: incorporate region/zone into the API.
def instance_type_to_hourly_cost(self, instance_type, use_spot):
def instance_type_to_hourly_cost(self, instance_type: str, use_spot: bool,
region: Optional[str],
zone: Optional[str]) -> float:
"""Returns the hourly on-demand/spot price for an instance type."""
raise NotImplementedError

def accelerators_to_hourly_cost(self, accelerators, use_spot):
def accelerators_to_hourly_cost(self, accelerators: Dict[str, int],
use_spot: bool, region: Optional[str],
zone: Optional[str]) -> float:
"""Returns the hourly on-demand price for accelerators."""
raise NotImplementedError

Expand Down
Loading

0 comments on commit 63d057b

Please sign in to comment.