Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spot] Add eager failover strategy #2234

Merged
merged 10 commits into from
Aug 3, 2023
20 changes: 16 additions & 4 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,11 +623,18 @@ class GangSchedulingStatus(enum.Enum):
GANG_FAILED = 1
HEAD_FAILED = 2

def __init__(self, log_dir: str, dag: 'dag.Dag',
def __init__(self,
log_dir: str,
dag: 'dag.Dag',
optimize_target: 'optimizer.OptimizeTarget',
requested_features: Set[clouds.CloudImplementationFeatures],
local_wheel_path: pathlib.Path, wheel_hash: str):
local_wheel_path: pathlib.Path,
wheel_hash: str,
blocked_resources: Optional[Iterable[
resources_lib.Resources]] = None):
self._blocked_resources: Set[resources_lib.Resources] = set()
if blocked_resources:
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
self._blocked_resources.update(blocked_resources)

self.log_dir = os.path.expanduser(log_dir)
self._dag = dag
Expand Down Expand Up @@ -2452,8 +2459,13 @@ def _provision(
# of optimization infinitely.
try:
provisioner = RetryingVmProvisioner(
self.log_dir, self._dag, self._optimize_target,
self._requested_features, local_wheel_path, wheel_hash)
self.log_dir,
self._dag,
self._optimize_target,
self._requested_features,
local_wheel_path,
wheel_hash,
blocked_resources=task.blocked_resources)
config_dict = provisioner.provision_with_retries(
task, to_provision_config, dryrun, stream_logs)
break
Expand Down
78 changes: 76 additions & 2 deletions sky/spot/recovery_strategy.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
"""The strategy to handle launching/recovery/termination of spot clusters."""
"""The strategy to handle launching/recovery/termination of spot clusters.

In the YAML file, the user can specify the strategy to use for spot jobs.

resources:
spot_recovery: AGGRESSIVE_FAILOVER
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
"""
import time
import traceback
import typing
from typing import Optional, Tuple

import sky
from sky import backends
from sky import exceptions
from sky import global_user_state
from sky import sky_logging
from sky import status_lib
from sky import backends
from sky.backends import backend_utils
from sky.skylet import job_lib
from sky.spot import spot_utils
Expand Down Expand Up @@ -444,3 +450,71 @@ def recover(self) -> float:
f'{self._MAX_RETRY_CNT} times.')

return job_submitted_at


class AggressiveFailoverStrategyExecutor(FailoverStrategyExecutor,
name='AGGRESSIVE_FAILOVER',
default=False):
"""Aggressive failover strategy
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved

This strategy is an extension of the failover strategy. Instead of waiting
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
in the same region when the preemption happens, it immediately terminates
the cluster and relaunches it in a different regions. This is based on the
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
observation that the preemption is likely to happen again shortly in the
same region, so trying other regions first is more likely to get a longer
running cluster.
"""

def recover(self) -> float:
# 1. Terminate the current cluster
# 2. Launch the cluster without retrying the previously launched region
# 3. Launch the cluster with no cloud/region constraint or respect the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "or respect the original user specification." mean? It seems like we should respect the original requirements.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to resources requirements. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:
2. Launch again by explicitly blocking the previously launched region (this will failover through the entire search space except the previously launched region)
3. (If step 2 failed) Retry forever: Launch again with no blocked locations (this will failover through the entire search space)

The entire search space is defined by the original task request, task.resources.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Updated. Thanks!

# original user specification.

# Step 1
logger.debug('Terminating unhealthy spot cluster and '
'reset cloud region.')
terminate_cluster(self.cluster_name)

# Step 2
logger.debug('Relaunch the cluster skipping the previously launched '
'cloud/region.')
if self._launched_cloud_region is not None:
launched_cloud, launched_region = self._launched_cloud_region
task = self.dag.tasks[0]
resources = list(task.resources)[0]
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
task.blocked_resources = {
resources.copy(cloud=launched_cloud, region=launched_region)
}
# Not using self.launch to avoid the retry until up logic.
job_submitted_at = self._launch(raise_on_failure=False)
# Restore the original dag, i.e. reset the region constraint.
if job_submitted_at is not None:
return job_submitted_at
self._launched_cloud_region = None
terminate_cluster(self.cluster_name)

# Retry the entire block until the cluster is up, so that the ratio of
# the time spent in the current region and the time spent in the other
# region is consistent during the retry.
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
while True:
# Step 3
logger.debug('Relaunch the cluster without constraining to prior '
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
'cloud/region.')
# Not using self.launch to avoid the retry until up logic.
job_submitted_at = self._launch(max_retry=self._MAX_RETRY_CNT,
raise_on_failure=False)
if job_submitted_at is None:
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
# Failed to launch the cluster.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should think about who should set self._launched_cloud_region to None reliably. In _launch()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should let the _launch() to set the self._launched_cloud_region to None, because in that case we will not be able to control how many retries for exhausted failover without using the current region, before we start failover with the current region.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(discussed offline) Maybe we should put it in _launch() to make it the sole accessor.

if self.retry_until_up:
gap_seconds = self.RETRY_INIT_GAP_SECONDS
logger.info('Retrying to recover the spot cluster in '
f'{gap_seconds:.1f} seconds.')
time.sleep(gap_seconds)
continue
with ux_utils.print_exception_no_traceback():
raise exceptions.ResourcesUnavailableError(
f'Failed to recover the spot cluster after retrying '
f'{self._MAX_RETRY_CNT} times.')

return job_submitted_at
4 changes: 4 additions & 0 deletions sky/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def __init__(
# Advanced:
docker_image: Optional[str] = None,
event_callback: Optional[str] = None,
blocked_resources: Optional['resources_lib.Resources'] = None,
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
):
"""Initializes a Task.

Expand Down Expand Up @@ -194,6 +195,9 @@ def __init__(
self.estimated_outputs_size_gigabytes = None
# Default to CPUNode
self.resources = {sky.Resources()}
# Resources that this task cannot run on.
self.blocked_resources = blocked_resources

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a comment to explain the purpose of the blocked_resources attribute in the Task class. This will help developers understand its role and how it is used. [medium]


self.time_estimator_func: Optional[Callable[['sky.Resources'],
int]] = None
self.file_mounts: Optional[Dict[str, str]] = None
Expand Down