Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Respect the block list when failover #1585

Merged
merged 8 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 75 additions & 57 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Backend: runs on cloud virtual machines, managed by Ray."""
import ast
import copy
import enum
import getpass
import inspect
Expand Down Expand Up @@ -574,9 +575,7 @@ class GangSchedulingStatus(enum.Enum):
def __init__(self, log_dir: str, dag: 'dag.Dag',
optimize_target: OptimizeTarget,
local_wheel_path: pathlib.Path, wheel_hash: str):
self._blocked_regions = set()
self._blocked_zones = set()
self._blocked_launchable_resources = set()
self._blocked_resources = set()

self.log_dir = os.path.expanduser(log_dir)
self._dag = dag
Expand All @@ -586,24 +585,11 @@ def __init__(self, log_dir: str, dag: 'dag.Dag',

colorama.init()

def _in_blocklist(self, cloud, region, zones):
if region.name in self._blocked_regions:
return True
# We do not keep track of zones in Azure and Local,
# as both clouds do not have zones.
if isinstance(cloud, (clouds.Azure, clouds.Local)):
return False
assert zones, (cloud, region, zones)
for zone in zones:
if zone.name not in self._blocked_zones:
return False
return True
def _update_blocklist_on_gcp_error(
self, launchable_resources: 'resources_lib.Resources', region,
zones, stdout, stderr):

def _clear_blocklist(self):
self._blocked_regions.clear()
self._blocked_zones.clear()

def _update_blocklist_on_gcp_error(self, region, zones, stdout, stderr):
del region # unused
style = colorama.Style
assert len(zones) == 1, zones
zone = zones[0]
Expand Down Expand Up @@ -638,17 +624,13 @@ def _update_blocklist_on_gcp_error(self, region, zones, stdout, stderr):
# This skip is only correct if we implement "first
# retry the region/zone of an existing cluster with the
# same name" correctly.
for r in clouds.GCP.regions_with_offering(
instance_type=None,
accelerators=None,
use_spot=False,
region=None,
zone=None):
self._blocked_regions.add(r.name)
self._blocked_resources.add(
launchable_resources.copy(region=None, zone=None))
else:
# Per region. Ex: Quota 'CPUS' exceeded. Limit: 24.0
# in region us-west1.
self._blocked_regions.add(region.name)
self._blocked_resources.add(
launchable_resources.copy(zone=None))
elif code in [
'ZONE_RESOURCE_POOL_EXHAUSTED',
'ZONE_RESOURCE_POOL_EXHAUSTED_WITH_DETAILS',
Expand All @@ -658,30 +640,34 @@ def _update_blocklist_on_gcp_error(self, region, zones, stdout, stderr):
# However, UNSUPPORTED_OPERATION is observed empirically
# when VM is preempted during creation. This seems to be
# not documented by GCP.
self._blocked_zones.add(zone.name)
self._blocked_resources.add(
launchable_resources.copy(zone=zone.name))
elif code in ['RESOURCE_NOT_READY']:
# This code is returned when the VM is still STOPPING.
self._blocked_zones.add(zone.name)
self._blocked_resources.add(
launchable_resources.copy(zone=zone.name))
elif code == 8:
# Error code 8 means TPU resources is out of
# capacity. Example:
# {'code': 8, 'message': 'There is no more capacity in the zone "europe-west4-a"; you can try in another zone where Cloud TPU Nodes are offered (see https://cloud.google.com/tpu/docs/regions) [EID: 0x1bc8f9d790be9142]'} # pylint: disable=line-too-long
self._blocked_zones.add(zone.name)
self._blocked_resources.add(
launchable_resources.copy(zone=zone.name))
else:
assert False, error
elif len(httperror_str) >= 1:
logger.info(f'Got {httperror_str[0]}')
if ('Requested disk size cannot be smaller than the image size'
in httperror_str[0]):
logger.info('Skipping all regions due to disk size issue.')
for r, _ in clouds.GCP.region_zones_provision_loop():
self._blocked_regions.add(r.name)
self._blocked_resources.add(
launchable_resources.copy(region=None, zone=None))
else:
# Parse HttpError for unauthorized regions. Example:
# googleapiclient.errors.HttpError: <HttpError 403 when requesting ... returned "Location us-east1-d is not found or access is unauthorized.". # pylint: disable=line-too-long
# Details: "Location us-east1-d is not found or access is
# unauthorized.">
self._blocked_zones.add(zone.name)
self._blocked_resources.add(
launchable_resources.copy(zone=zone.name))
else:
# No such structured error response found.
assert not exception_str, stderr
Expand All @@ -690,7 +676,8 @@ def _update_blocklist_on_gcp_error(self, region, zones, stdout, stderr):
# 'projects/<id>/zones/zone/acceleratorTypes/nvidia-tesla-v100'
# was not found.
logger.warning(f'Got \'resource not found\' in {zone.name}.')
self._blocked_zones.add(zone.name)
self._blocked_resources.add(
launchable_resources.copy(zone=zone.name))
else:
logger.info('====== stdout ======')
for s in stdout.split('\n'):
Expand All @@ -703,7 +690,9 @@ def _update_blocklist_on_gcp_error(self, region, zones, stdout, stderr):
raise RuntimeError('Errors occurred during provision; '
'check logs above.')

def _update_blocklist_on_aws_error(self, region, zones, stdout, stderr):
def _update_blocklist_on_aws_error(
self, launchable_resources: 'resources_lib.Resources', region,
zones, stdout, stderr):
style = colorama.Style
stdout_splits = stdout.split('\n')
stderr_splits = stderr.split('\n')
Expand Down Expand Up @@ -748,9 +737,13 @@ def _update_blocklist_on_aws_error(self, region, zones, stdout, stderr):
logger.warning(f'Got error(s) in {zones_str}:')
messages = '\n\t'.join(errors)
logger.warning(f'{style.DIM}\t{messages}{style.RESET_ALL}')
self._blocked_regions.add(region.name)
for zone in zones:
self._blocked_resources.add(
launchable_resources.copy(zone=zone.name))
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved

def _update_blocklist_on_azure_error(self, region, zones, stdout, stderr):
def _update_blocklist_on_azure_error(
self, launchable_resources: 'resources_lib.Resources', region,
zones, stdout, stderr):
del zones # Unused.
# The underlying ray autoscaler will try all zones of a region at once.
style = colorama.Style
Expand All @@ -777,12 +770,14 @@ def _update_blocklist_on_azure_error(self, region, zones, stdout, stderr):
messages = '\n\t'.join(errors)
logger.warning(f'{style.DIM}\t{messages}{style.RESET_ALL}')
if any('(ReadOnlyDisabledSubscription)' in s for s in errors):
for r in sky.Azure.regions():
self._blocked_regions.add(r.name)
self._blocked_resources.add(
resources_lib.Resources(cloud=clouds.Azure()))
else:
self._blocked_regions.add(region.name)
self._blocked_resources.add(launchable_resources.copy(zone=None))
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved

def _update_blocklist_on_local_error(self, region, zones, stdout, stderr):
def _update_blocklist_on_local_error(
self, launchable_resources: 'resources_lib.Resources', region,
zones, stdout, stderr):
del zones # Unused.
style = colorama.Style
stdout_splits = stdout.split('\n')
Expand All @@ -806,10 +801,12 @@ def _update_blocklist_on_local_error(self, region, zones, stdout, stderr):
logger.warning('Got error(s) on local cluster:')
messages = '\n\t'.join(errors)
logger.warning(f'{style.DIM}\t{messages}{style.RESET_ALL}')
self._blocked_regions.add(region.name)
self._blocked_resources.add(
launchable_resources.copy(region=region.name, zone=None))

def _update_blocklist_on_error(self, cloud, region, zones, stdout,
stderr) -> bool:
def _update_blocklist_on_error(
self, launchable_resources: 'resources_lib.Resources', region,
zones, stdout, stderr) -> bool:
"""Handles cloud-specific errors and updates the block list.

This parses textual stdout/stderr because we don't directly use the
Expand All @@ -821,11 +818,15 @@ def _update_blocklist_on_error(self, cloud, region, zones, stdout,
launched (e.g., due to VPC errors we have never sent the provision
request), False otherwise.
"""
assert launchable_resources.region == region.name, (
launchable_resources, region)
if stdout is None:
# Gang scheduling failure (head node is definitely up, but some
# workers' provisioning failed). Simply block the region.
# workers' provisioning failed). Simply block the zones.
assert stderr is None, stderr
self._blocked_regions.add(region.name)
for zone in zones:
self._blocked_resources.add(
launchable_resources.copy(zone=zone.name))
return False # definitely_no_nodes_launched

# TODO(zongheng): refactor into Cloud interface?
Expand All @@ -835,13 +836,14 @@ def _update_blocklist_on_error(self, cloud, region, zones, stdout,
clouds.GCP: self._update_blocklist_on_gcp_error,
clouds.Local: self._update_blocklist_on_local_error,
}
cloud = launchable_resources.cloud
cloud_type = type(cloud)
if cloud_type not in handlers:
raise NotImplementedError(
'Cloud {cloud} unknown, or has not added '
'support for parsing and handling provision failures.')
handler = handlers[cloud_type]
handler(region, zones, stdout, stderr)
handler(launchable_resources, region, zones, stdout, stderr)

stdout_splits = stdout.split('\n')
stderr_splits = stderr.split('\n')
Expand Down Expand Up @@ -1084,12 +1086,29 @@ def _retry_region_zones(self,
global_user_state.ClusterStatus.UP
]

self._clear_blocklist()
for region, zones in self._yield_region_zones(to_provision,
cluster_name,
cluster_exists):
if self._in_blocklist(to_provision.cloud, region, zones):
# Filter out zones that are blocked, if any.
# This optimize the provision loop by skipping zones that are
# indicated to be unavailable from previous provision attempts.
# It can happen for the provisioning on GCP, as the
# yield_region_zones will return zones from a region one by one,
# but the optimizer that does the filtering will not be involved
# until the next region.
filtered_zones = copy.deepcopy(zones)
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
for zone in zones:
for blocked_resources in self._blocked_resources:
if to_provision.copy(region=region.name,
zone=zone.name).should_be_blocked_by(
blocked_resources):
filtered_zones.remove(zone)
break
WoosukKwon marked this conversation as resolved.
Show resolved Hide resolved
if zones and not filtered_zones:
# Skip the region if all zones are blocked.
continue
zones = filtered_zones

if not zones:
# For Azure, zones is always an empty list.
zone_str = 'all zones'
Expand Down Expand Up @@ -1199,14 +1218,14 @@ def _retry_region_zones(self,
if status == self.GangSchedulingStatus.HEAD_FAILED:
# ray up failed for the head node.
definitely_no_nodes_launched = self._update_blocklist_on_error(
to_provision.cloud, region, zones, stdout, stderr)
to_provision, region, zones, stdout, stderr)
else:
# gang scheduling failed.
assert status == self.GangSchedulingStatus.GANG_FAILED, status
# The stdout/stderr of ray up is not useful here, since
# head node is successfully provisioned.
definitely_no_nodes_launched = self._update_blocklist_on_error(
to_provision.cloud,
to_provision,
region,
# Ignored and block region:
zones=None,
Expand Down Expand Up @@ -1650,7 +1669,7 @@ def provision_with_retries(
if not cluster_exists:
# Add failed resources to the blocklist, only when it
# is in fallback mode.
self._blocked_launchable_resources.add(to_provision)
self._blocked_resources.add(to_provision)
else:
logger.info('Retrying provisioning with requested resources '
f'{task.num_nodes}x {task.resources}')
Expand All @@ -1666,10 +1685,9 @@ def provision_with_retries(
# (otherwise will skip re-optimizing this task).
# TODO: set all remaining tasks' best_resources to None.
task.best_resources = None
self._dag = sky.optimize(
self._dag,
minimize=self._optimize_target,
blocked_launchable_resources=self._blocked_launchable_resources)
self._dag = sky.optimize(self._dag,
minimize=self._optimize_target,
blocked_resources=self._blocked_resources)
to_provision = task.best_resources
assert task in self._dag.tasks, 'Internal logic error.'
assert to_provision is not None, task
Expand Down
Loading