diff --git a/sky/provision/gcp/instance_utils.py b/sky/provision/gcp/instance_utils.py index 7654795e096..dde0918274d 100644 --- a/sky/provision/gcp/instance_utils.py +++ b/sky/provision/gcp/instance_utils.py @@ -1023,6 +1023,8 @@ def filter( # Return empty dict instead of raising exception to not break. if 'is not found or access is unauthorized.' in str(e): return {} + if 'Permission \'tpu.nodes.list\' denied on' in str(e): + return {} logger.debug(f'filter: googleapiclient.errors.HttpError: {e}') raise diff --git a/sky/serve/autoscalers.py b/sky/serve/autoscalers.py index e059adbd608..d533df382cc 100644 --- a/sky/serve/autoscalers.py +++ b/sky/serve/autoscalers.py @@ -75,6 +75,10 @@ def __init__(self, service_name: str, # Target number of replicas is initialized to min replicas self.target_num_replicas: int = spec.min_replicas self.latest_version: int = constants.INITIAL_VERSION + # The latest_version_ever_ready should be smaller than the + # latest_version, so we can fail early if the initial version got + # unrecoverable failure. + self.latest_version_ever_ready: int = self.latest_version - 1 self.update_mode = serve_utils.DEFAULT_UPDATE_MODE def update_version(self, version: int, spec: 'service_spec.SkyServiceSpec', @@ -113,13 +117,25 @@ def from_spec(cls, service_name: str, else: return RequestRateAutoscaler(service_name, spec) + def _dump_dynamic_states(self) -> Dict[str, Any]: + """Dump dynamic states from autoscaler.""" + raise NotImplementedError + def dump_dynamic_states(self) -> Dict[str, Any]: """Dump dynamic states from autoscaler.""" + states = {'latest_version_ever_ready': self.latest_version_ever_ready} + states.update(self._dump_dynamic_states()) + return states + + def _load_dynamic_states(self, dynamic_states: Dict[str, Any]) -> None: + """Load dynamic states to autoscaler.""" raise NotImplementedError def load_dynamic_states(self, dynamic_states: Dict[str, Any]) -> None: """Load dynamic states to autoscaler.""" - raise NotImplementedError + self.latest_version_ever_ready = dynamic_states.pop( + 'latest_version_ever_ready', constants.INITIAL_VERSION) + self._load_dynamic_states(dynamic_states) class RequestRateAutoscaler(Autoscaler): @@ -376,12 +392,24 @@ def evaluate_scaling( override dict. Active migration could require returning both SCALE_UP and SCALE_DOWN. """ + latest_replicas: List['replica_managers.ReplicaInfo'] = [] latest_nonterminal_replicas: List['replica_managers.ReplicaInfo'] = [] for info in replica_infos: if info.version == self.latest_version: + latest_replicas.append(info) if not info.is_terminal: latest_nonterminal_replicas.append(info) + if info.is_ready: + self.latest_version_ever_ready = self.latest_version + if self.latest_version_ever_ready < self.latest_version: + for info in latest_replicas: + if info.status_property.unrecoverable_failure(): + # Stop scaling if one of replica of the latest version + # failed, it is likely that a fatal error happens to the + # user application and may lead to a infinte termination + # and restart. + return [] self._set_target_num_replica_with_hysteresis() @@ -433,12 +461,12 @@ def evaluate_scaling( logger.info('No scaling needed.') return scaling_options - def dump_dynamic_states(self) -> Dict[str, Any]: + def _dump_dynamic_states(self) -> Dict[str, Any]: return { 'request_timestamps': self.request_timestamps, } - def load_dynamic_states(self, dynamic_states: Dict[str, Any]) -> None: + def _load_dynamic_states(self, dynamic_states: Dict[str, Any]) -> None: if 'request_timestamps' in dynamic_states: self.request_timestamps = dynamic_states.pop('request_timestamps') if dynamic_states: diff --git a/sky/serve/replica_managers.py b/sky/serve/replica_managers.py index e4c7bf0ba32..70e5ba2c6dd 100644 --- a/sky/serve/replica_managers.py +++ b/sky/serve/replica_managers.py @@ -235,52 +235,59 @@ class ReplicaStatusProperty: sky_launch_status: Optional[ProcessStatus] = None user_app_failed: bool = False service_ready_now: bool = False - # None means readiness probe is not executed yet; + # None means readiness probe is not succeeded yet; # -1 means the initial delay seconds is exceeded. first_ready_time: Optional[float] = None # None means sky.down is not called yet. sky_down_status: Optional[ProcessStatus] = None + # Whether the termination is caused by autoscaler's decision + is_scale_down: bool = False # The replica's spot instance was preempted. preempted: bool = False - def is_scale_down_succeeded(self, initial_delay_seconds: int) -> bool: + def remove_terminated_replica(self) -> bool: """Whether to remove the replica record from the replica table. If not, the replica will stay in the replica table permanently to notify the user that something is wrong with the user code / setup. """ - if self.sky_launch_status == ProcessStatus.INTERRUPTED: - return True - if self.sky_launch_status != ProcessStatus.SUCCEEDED: - # sky_launch_status == RUNNING: a scale down happened before - # the sky.launch finished. - return self.sky_launch_status != ProcessStatus.FAILED - if self.sky_down_status != ProcessStatus.SUCCEEDED: + return self.is_scale_down + + def unrecoverable_failure(self) -> bool: + """Whether the replica fails and cannot be recovered. + + Autoscaler should stop scaling if any of the replica has unrecoverable + failure, e.g., the user app fails before the service endpoint being + ready for the current version. + """ + replica_status = self.to_replica_status() + logger.info( + 'Check replica unrecorverable: first_ready_time ' + f'{self.first_ready_time}, user_app_failed {self.user_app_failed}, ' + f'status {replica_status}') + if replica_status not in serve_state.ReplicaStatus.terminal_statuses(): return False - if self.preempted: - return True - if (self.first_ready_time is not None and - time.time() - self.first_ready_time > initial_delay_seconds): - # If the service is up for more than `initial_delay_seconds`, - # we assume there is no bug in the user code and the scale down - # is successful, thus enabling the controller to remove the - # replica from the replica table and auto restart the replica. - # Here we assume that initial_delay_seconds is larger than - # consecutive_failure_threshold_seconds, so if a replica is not - # teardown for initial_delay_seconds, it is safe to assume that - # it is UP for initial_delay_seconds. - # For replica with a failed sky.launch, it is likely due to some - # misconfigured resources, so we don't want to auto restart it. - # For replica with a failed sky.down, we cannot restart it since - # otherwise we will have a resource leak. - return True + if self.first_ready_time is not None: + if self.first_ready_time >= 0: + # If the service is ever up, we assume there is no bug in the + # user code and the scale down is successful, thus enabling the + # controller to remove the replica from the replica table and + # auto restart the replica. + # For replica with a failed sky.launch, it is likely due to some + # misconfigured resources, so we don't want to auto restart it. + # For replica with a failed sky.down, we cannot restart it since + # otherwise we will have a resource leak. + return False + else: + # If the initial delay exceeded, it is likely the service is not + # recoverable. + return True if self.user_app_failed: - return False - if self.first_ready_time is None: return True - if not self.service_ready_now: - return False - return self.first_ready_time >= 0.0 + # TODO(zhwu): launch failures not related to resource unavailability + # should be considered as unrecoverable failure. (refer to + # `spot.recovery_strategy.StrategyExecutor::_launch`) + return False def should_track_service_status(self) -> bool: """Should we track the status of the replica. @@ -333,17 +340,17 @@ def to_replica_status(self) -> serve_state.ReplicaStatus: return serve_state.ReplicaStatus.FAILED if self.sky_launch_status == ProcessStatus.FAILED: # sky.launch failed - return serve_state.ReplicaStatus.FAILED + return serve_state.ReplicaStatus.FAILED_PROVISION if self.first_ready_time is None: # readiness probe is not executed yet, but a scale down is # triggered. return serve_state.ReplicaStatus.SHUTTING_DOWN if self.first_ready_time == -1: # initial delay seconds exceeded - return serve_state.ReplicaStatus.FAILED + return serve_state.ReplicaStatus.FAILED_INITIAL_DELAY if not self.service_ready_now: # Max continuous failure exceeded - return serve_state.ReplicaStatus.FAILED + return serve_state.ReplicaStatus.FAILED_PROBING # This indicate it is a scale_down with correct teardown. # Should have been cleaned from the replica table. return serve_state.ReplicaStatus.UNKNOWN @@ -641,8 +648,11 @@ def scale_up(self, self._launch_replica(self._next_replica_id, resources_override) self._next_replica_id += 1 - def _terminate_replica(self, replica_id: int, sync_down_logs: bool, - replica_drain_delay_seconds: int) -> None: + def _terminate_replica(self, + replica_id: int, + sync_down_logs: bool, + replica_drain_delay_seconds: int, + is_scale_down: bool = False) -> None: if replica_id in self._launch_process_pool: info = serve_state.get_replica_info_from_id(self._service_name, @@ -725,6 +735,7 @@ def _download_and_stream_logs(info: ReplicaInfo): args=(info.cluster_name, replica_drain_delay_seconds), ) info.status_property.sky_down_status = ProcessStatus.RUNNING + info.status_property.is_scale_down = is_scale_down serve_state.add_or_update_replica(self._service_name, replica_id, info) p.start() self._down_process_pool[replica_id] = p @@ -733,7 +744,8 @@ def scale_down(self, replica_id: int) -> None: self._terminate_replica( replica_id, sync_down_logs=False, - replica_drain_delay_seconds=_DEFAULT_DRAIN_SECONDS) + replica_drain_delay_seconds=_DEFAULT_DRAIN_SECONDS, + is_scale_down=True) def _handle_preemption(self, info: ReplicaInfo) -> bool: """Handle preemption of the replica if any error happened. @@ -774,7 +786,8 @@ def _handle_preemption(self, info: ReplicaInfo) -> bool: info) self._terminate_replica(info.replica_id, sync_down_logs=False, - replica_drain_delay_seconds=0) + replica_drain_delay_seconds=0, + is_scale_down=True) return True ################################# @@ -859,12 +872,10 @@ def _refresh_process_pool(self) -> None: # initial_delay_seconds is not supported. We should add it # later when we support `sky serve update`. removal_reason = None - if info.status_property.is_scale_down_succeeded( - self._get_initial_delay_seconds(info.version)): - # This means the cluster is deleted due to - # a scale down or the cluster is recovering - # from preemption. Delete the replica info - # so it won't count as a replica. + if info.status_property.is_scale_down: + # This means the cluster is deleted due to an autoscaler + # decision or the cluster is recovering from preemption. + # Delete the replica info so it won't count as a replica. if info.status_property.preempted: removal_reason = 'for preemption recovery' else: diff --git a/sky/serve/serve_state.py b/sky/serve/serve_state.py index 4a8512f8e20..df2b3f58345 100644 --- a/sky/serve/serve_state.py +++ b/sky/serve/serve_state.py @@ -98,9 +98,18 @@ class ReplicaStatus(enum.Enum): # The replica VM is being shut down. i.e., the `sky down` is still running. SHUTTING_DOWN = 'SHUTTING_DOWN' - # The replica VM is once failed and has been deleted. + # The replica fails due to user's run/setup. FAILED = 'FAILED' + # The replica fails due to initial delay exceeded. + FAILED_INITIAL_DELAY = 'FAILED_INITIAL_DELAY' + + # The replica fails due to healthiness check. + FAILED_PROBING = 'FAILED_PROBING' + + # The replica fails during launching + FAILED_PROVISION = 'FAILED_PROVISION' + # `sky.down` failed during service teardown. # This could mean resource leakage. # TODO(tian): This status should be removed in the future, at which point @@ -115,14 +124,15 @@ class ReplicaStatus(enum.Enum): @classmethod def failed_statuses(cls) -> List['ReplicaStatus']: - return [cls.FAILED, cls.FAILED_CLEANUP, cls.UNKNOWN] + return [ + cls.FAILED, cls.FAILED_CLEANUP, cls.FAILED_INITIAL_DELAY, + cls.FAILED_PROBING, cls.FAILED_PROVISION, cls.UNKNOWN + ] @classmethod def terminal_statuses(cls) -> List['ReplicaStatus']: - return [ - cls.SHUTTING_DOWN, cls.FAILED, cls.FAILED_CLEANUP, cls.PREEMPTED, - cls.UNKNOWN - ] + return [cls.SHUTTING_DOWN, cls.PREEMPTED, cls.UNKNOWN + ] + cls.failed_statuses() @classmethod def scale_down_decision_order(cls) -> List['ReplicaStatus']: @@ -145,6 +155,9 @@ def colored_str(self) -> str: ReplicaStatus.NOT_READY: colorama.Fore.YELLOW, ReplicaStatus.SHUTTING_DOWN: colorama.Fore.MAGENTA, ReplicaStatus.FAILED: colorama.Fore.RED, + ReplicaStatus.FAILED_INITIAL_DELAY: colorama.Fore.RED, + ReplicaStatus.FAILED_PROBING: colorama.Fore.RED, + ReplicaStatus.FAILED_PROVISION: colorama.Fore.RED, ReplicaStatus.FAILED_CLEANUP: colorama.Fore.RED, ReplicaStatus.PREEMPTED: colorama.Fore.MAGENTA, ReplicaStatus.UNKNOWN: colorama.Fore.RED, diff --git a/sky/serve/serve_utils.py b/sky/serve/serve_utils.py index 20bc3a34ddb..0814441eb79 100644 --- a/sky/serve/serve_utils.py +++ b/sky/serve/serve_utils.py @@ -715,7 +715,7 @@ def _get_replicas(service_record: Dict[str, Any]) -> str: if info['status'] == serve_state.ReplicaStatus.READY: ready_replica_num += 1 # TODO(MaoZiming): add a column showing failed replicas number. - if info['status'] != serve_state.ReplicaStatus.FAILED: + if info['status'] not in serve_state.ReplicaStatus.failed_statuses(): total_replica_num += 1 return f'{ready_replica_num}/{total_replica_num}' diff --git a/tests/skyserve/failures/initial_delay.yaml b/tests/skyserve/failures/initial_delay.yaml new file mode 100644 index 00000000000..dd2749a947d --- /dev/null +++ b/tests/skyserve/failures/initial_delay.yaml @@ -0,0 +1,12 @@ +service: + readiness_probe: + path: /health + initial_delay_seconds: 10 + replicas: 2 + +resources: + cpus: 2 + ports: 8081 + +run: | + sleep 1000 diff --git a/tests/skyserve/failures/probing.py b/tests/skyserve/failures/probing.py new file mode 100644 index 00000000000..4767e7ebfd2 --- /dev/null +++ b/tests/skyserve/failures/probing.py @@ -0,0 +1,38 @@ +import argparse +import http.server +import socketserver + + +class MyHttpRequestHandler(http.server.SimpleHTTPRequestHandler): + + def do_GET(self): + # Return 200 for all paths + # Therefore, readiness_probe will return 200 at path '/health' + self.send_response(200) + self.send_header('Content-type', 'text/html') + self.end_headers() + html = """ + +
+