Skip to content

Commit

Permalink
[SkyServe] Final changes for v0 release (#2396)
Browse files Browse the repository at this point in the history
* add vicuna v1.5 example

* add replica ip in table; rename some vars

* warning if sky launch a service yaml

* format

* start progress after error log

* fix type name

* log format

* logger with skylogging format

* dump user app fail to control plane log

* ux

* add launched_at and service_yaml to local DB; delete cloud storage locally

* rapid bootstraping

* format

* move skyserve controller to separate section in sky status

* add hint to see detailed sky serve status

* restore example

* rename control plane to controller

* rename to hello_skyserve

* rename to hello_skyserve

* change port to align doc

* inline controller failed checking

* override user resources parameter

* format

* add some todos

* remove redundant return

* use handle to store information

* fix error const name

* simplify resources representation

* check cluster status earlier

* minor

* minor

* add back service section since we still need it in controller

* restore vicuna example

* print all info when use sky serve status -a

* better handling of unknown status

* add warning for status that cannot be sky.down

* minor comment fixes

* remove Tip: to reuse an existing cluster

* enable extra port on controller

* more detailed info when acc is None

* Apply suggestions from code review

Co-authored-by: Wei-Lin Chiang <[email protected]>

* add doc string

---------

Co-authored-by: Wei-Lin Chiang <[email protected]>
  • Loading branch information
cblmemo and infwinston authored Aug 15, 2023
1 parent c32e18d commit d6bd068
Show file tree
Hide file tree
Showing 24 changed files with 548 additions and 358 deletions.
4 changes: 2 additions & 2 deletions sky/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
from sky.data import StoreType
from sky.execution import exec # pylint: disable=redefined-builtin
from sky.execution import launch
from sky.execution import spot_launch
from sky.execution import serve_up
from sky.execution import serve_down
from sky.execution import serve_up
from sky.execution import spot_launch
from sky.optimizer import Optimizer
from sky.optimizer import OptimizeTarget
from sky.resources import Resources
Expand Down
71 changes: 32 additions & 39 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
from sky import exceptions
from sky import global_user_state
from sky import provision as provision_lib
from sky import serve as serve_lib
from sky import sky_logging
from sky import skypilot_config
from sky import spot as spot_lib
from sky import serve as serve_lib
from sky import status_lib
from sky.backends import onprem_utils
from sky.skylet import constants
Expand Down Expand Up @@ -2612,42 +2612,48 @@ def _service_status_from_replica_info(
# If one replica is READY, the service is READY.
if status2num[status_lib.ReplicaStatus.READY] > 0:
return status_lib.ServiceStatus.READY
if (status2num[status_lib.ReplicaStatus.FAILED] +
status2num[status_lib.ReplicaStatus.FAILED_CLEANUP] > 0):
if sum(status2num[status]
for status in status_lib.ReplicaStatus.failed_statuses()) > 0:
return status_lib.ServiceStatus.FAILED
return status_lib.ServiceStatus.REPLICA_INIT


def _check_controller_status_and_set_service_status(
service_name: str, cluster_name: str) -> Optional[str]:
cluster_record = global_user_state.get_cluster_from_name(cluster_name)
if (cluster_record is None or
cluster_record['status'] != status_lib.ClusterStatus.UP):
global_user_state.set_service_status(
service_name, status_lib.ServiceStatus.CONTRLLER_FAILED)
return f'Controller cluster {cluster_name!r} is not found or UP.'
return None


def _refresh_service_record_no_lock(
service_name: str) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
"""Refresh the service, and return the possibly updated record.
Args:
service_name: The name of the service.
Returns:
A tuple of a possibly updated record and an error message if any error
occurred when refreshing the service.
"""
record = global_user_state.get_service_from_name(service_name)
if record is None:
return None, None
service_handle: serve_lib.ServiceHandle = record['handle']

try:
check_network_connection()
except exceptions.NetworkError:
return record, 'Failed to refresh replica info due to network error.'

if not record['endpoint']:
if not service_handle.endpoint:
# Service controller is still initializing. Skipped refresh status.
return record, None

controller_cluster_name = record['controller_cluster_name']
handle = global_user_state.get_handle_from_cluster_name(
controller_cluster_name = service_handle.controller_cluster_name
cluster_record = global_user_state.get_cluster_from_name(
controller_cluster_name)
assert handle is not None
if (cluster_record is None or
cluster_record['status'] != status_lib.ClusterStatus.UP):
global_user_state.set_service_status(
service_name, status_lib.ServiceStatus.CONTRLLER_FAILED)
return record, (f'Controller cluster {controller_cluster_name!r} '
'is not found or UP.')

handle = cluster_record['handle']
backend = get_backend_from_handle(handle)
assert isinstance(backend, backends.CloudVmRayBackend)

Expand All @@ -2659,37 +2665,23 @@ def _refresh_service_record_no_lock(
stream_logs=False,
separate_stderr=True)
if returncode != 0:
# If we cannot get the latest info, there are two possibilities:
# 1. The controller is not in a healthy state;
# 2. The control plane process somehow not respond to the request.
# For the first case, we want to catch the error and set the service
# status to CONTROLLER_FAILED.
# TODO(tian): Since we disabled sky down the controller, we might could
# assert cluster status is UP here and remove this function.
msg = _check_controller_status_and_set_service_status(
record['name'], controller_cluster_name)
if msg is None:
msg = ('Failed to refresh replica info from the controller. '
f'Using the cached record. Reason: {stderr}')
return record, msg
return record, ('Failed to refresh replica info from the controller. '
f'Using the cached record. Reason: {stderr}')

latest_info = serve_lib.load_latest_info(latest_info_payload)
record['replica_info'] = latest_info['replica_info']
record['uptime'] = latest_info['uptime']
service_handle.replica_info = latest_info['replica_info']
service_handle.uptime = latest_info['uptime']

msg = None
# When the service is shutting down, there is a period of time which the
# control plane still responds to the request, and the replica is not
# controller still responds to the request, and the replica is not
# terminated, so the return value for _service_status_from_replica_info
# will still be READY, but we don't want change service status to READY.
if record['status'] != status_lib.ServiceStatus.SHUTTING_DOWN:
new_status = _service_status_from_replica_info(
record['status'] = _service_status_from_replica_info(
latest_info['replica_info'])
record['status'] = new_status

global_user_state.add_or_update_service(**record)

return record, msg
return record, None


def _refresh_service_record(
Expand Down Expand Up @@ -2731,6 +2723,7 @@ def _refresh_service(service_name: str) -> Optional[Dict[str, Any]]:
print(
f'{colorama.Fore.YELLOW}Error occurred when refreshing service '
f'{service_name}: {msg}{colorama.Style.RESET_ALL}')
progress.start()
progress.update(task, advance=1)
return record

Expand Down
8 changes: 4 additions & 4 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
from sky import optimizer
from sky import provision as provision_lib
from sky import resources as resources_lib
from sky import serve as serve_lib
from sky import sky_logging
from sky import skypilot_config
from sky import spot as spot_lib
from sky import serve as serve_lib
from sky import status_lib
from sky import task as task_lib
from sky.backends import backend_utils
Expand Down Expand Up @@ -3093,8 +3093,8 @@ def _exec_code_on_head(
f'{backend_utils.BOLD}sky spot dashboard'
f'{backend_utils.RESET_BOLD}')
elif not name.startswith(serve_lib.CONTROLLER_PREFIX):
# Skip logging for submit control plane & redirector jobs
# to controller
# Skip logging for submit controller & redirector jobs
# to skyserve controller cluster
logger.info(f'{fore.CYAN}Job ID: '
f'{style.BRIGHT}{job_id}{style.RESET_ALL}'
'\nTo cancel the job:\t'
Expand Down Expand Up @@ -4044,7 +4044,7 @@ def _check_existing_cluster(
f'{cluster_name!r} [Username: {ssh_user}].'
f'{colorama.Style.RESET_ALL}\n'
'Run `sky status` to see existing clusters.')
else:
elif not cluster_name.startswith(serve_lib.CONTROLLER_PREFIX):
logger.info(
f'{colorama.Fore.CYAN}Creating a new cluster: "{cluster_name}" '
f'[{task.num_nodes}x {to_provision}].'
Expand Down
84 changes: 50 additions & 34 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
from sky import core
from sky import exceptions
from sky import global_user_state
from sky import serve as serve_lib
from sky import sky_logging
from sky import spot as spot_lib
from sky import serve as serve_lib
from sky import status_lib
from sky.backends import backend_utils
from sky.backends import onprem_utils
Expand Down Expand Up @@ -1420,6 +1420,14 @@ def launch(
with ux_utils.print_exception_no_traceback():
raise ValueError(f'{backend_name} backend is not supported.')

if task.service is not None:
logger.info(
f'{colorama.Fore.YELLOW}Service section will be ignored when using '
f'`sky launch`. {colorama.Style.RESET_ALL}\n{colorama.Fore.YELLOW}'
'To spin up a service, use SkyServe CLI: '
f'{colorama.Style.RESET_ALL}{colorama.Style.BRIGHT}sky serve up'
f'{colorama.Style.RESET_ALL}')

_launch_with_confirm(task,
backend,
cluster,
Expand Down Expand Up @@ -1729,12 +1737,22 @@ def status(all: bool, refresh: bool, show_spot_jobs: bool, clusters: List[str]):
refresh=refresh)
nonreserved_cluster_records = []
reserved_clusters = []
# TODO(tian): Rename this variable if other reserved prefix are added.
skyserve_controllers = []
for cluster_record in cluster_records:
cluster_name = cluster_record['name']
if cluster_name in backend_utils.SKY_RESERVED_CLUSTER_NAMES:
reserved_clusters.append(cluster_record)
else:
nonreserved_cluster_records.append(cluster_record)
is_skyserve_controller = False
for prefix in backend_utils.SKY_RESERVED_CLUSTER_PREFIXES:
if cluster_name.startswith(prefix):
is_skyserve_controller = True
break
if is_skyserve_controller:
skyserve_controllers.append(cluster_record)
else:
nonreserved_cluster_records.append(cluster_record)
local_clusters = onprem_utils.check_and_get_local_clusters(
suppress_error=True)

Expand All @@ -1744,6 +1762,14 @@ def status(all: bool, refresh: bool, show_spot_jobs: bool, clusters: List[str]):
status_utils.show_local_status_table(local_clusters)

hints = []
if skyserve_controllers:
click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}\n'
f'SkyServe Controllers{colorama.Style.RESET_ALL}')
status_utils.show_status_table(skyserve_controllers, all)
hints.append(
f'* To see detailed service status: {colorama.Style.BRIGHT}'
f'sky serve status{colorama.Style.RESET_ALL}')

if show_spot_jobs:
click.echo(f'\n{colorama.Fore.CYAN}{colorama.Style.BRIGHT}'
f'Managed spot jobs{colorama.Style.RESET_ALL}')
Expand Down Expand Up @@ -4014,30 +4040,22 @@ def serve_up(
raise ValueError(
'Specifying ports in resources is not allowed. SkyServe will '
'use the port specified in the service section.')
return

controller_resources_config = copy.copy(serve_lib.CONTROLLER_RESOURCES)
if task.service.controller_resources is not None:
controller_resources_config = task.service.controller_resources
else:
controller_resources_config = serve_lib.CONTROLLER_RESOURCES
controller_resources_config.update(task.service.controller_resources)
try:
controller_resources = sky.Resources.from_yaml_config(
controller_resources_config)
except ValueError as e:
raise ValueError(
'Encountered error when parsing controller resources') from e
if controller_resources.ports is not None:
with ux_utils.print_exception_no_traceback():
raise ValueError(
'Cannot specify ports in controller resources. SkyServe '
'will use the port specified in the service section.')
return

click.secho('Service Spec:', fg='cyan')
click.echo(task.service)

dummy_controller_task = sky.Task().set_resources(controller_resources)
click.secho('The controller will use the following resources:', fg='cyan')
click.secho('The controller will use the following resource:', fg='cyan')
with sky.Dag() as dag:
dag.add(dummy_controller_task)
sky.optimize(dag)
Expand Down Expand Up @@ -4083,12 +4101,12 @@ def serve_status(all: bool, service_name: Optional[str]):
- ``CONTROLLER_INIT``: The controller is initializing.
- ``REPLICA_INIT``: The controller provisioning have succeeded; control
plane and redirector is alive, and there are no available replicas for
- ``REPLICA_INIT``: The controller provisioning have succeeded; controller
and redirector process is alive, and there are no available replicas for
now. This also indicates that no replica failure has been detected.
- ``CONTROLLER_FAILED``: The controller failed to start or in an abnormal
state; or the control plane and redirector is not alive.
state; or the controller and redirector process is not alive.
- ``READY``: The controller is ready to serve requests. This means that
at least one replica have passed the readiness probe.
Expand Down Expand Up @@ -4159,7 +4177,8 @@ def serve_status(all: bool, service_name: Optional[str]):
f'Replicas{colorama.Style.RESET_ALL}')
replica_infos = []
for service_record in service_records:
for replica_record in service_record['replica_info']:
service_handle: serve_lib.ServiceHandle = service_record['handle']
for replica_record in service_handle.replica_info:
replica_record['service_name'] = service_record['name']
replica_infos.append(replica_record)
status_utils.show_replica_table(replica_infos, all)
Expand Down Expand Up @@ -4193,7 +4212,7 @@ def serve_down(
yes: bool,
purge: bool,
):
"""Tear down service(s).
"""Teardown service(s).
SERVICE_NAMES is the name of the service (or glob pattern) to tear down. If
both SERVICE_NAMES and ``--all`` are supplied, the latter takes precedence.
Expand Down Expand Up @@ -4286,11 +4305,11 @@ def _down_service(name: str):
default=True,
help=('Follow the logs of the job. [default: --follow] '
'If --no-follow is specified, print the log so far and exit.'))
@click.option('--control-plane',
@click.option('--controller',
is_flag=True,
default=False,
required=False,
help='Show the control plane logs of this service.')
help='Show the controller logs of this service.')
@click.option('--redirector',
is_flag=True,
default=False,
Expand All @@ -4305,7 +4324,7 @@ def _down_service(name: str):
def serve_logs(
service_name: str,
follow: bool,
control_plane: bool,
controller: bool,
redirector: bool,
replica_id: Optional[int],
):
Expand All @@ -4315,8 +4334,8 @@ def serve_logs(
.. code-block:: bash
# Tail the control plane logs of a service
sky serve logs --control-plane [SERVICE_ID]
# Tail the controller logs of a service
sky serve logs --controller [SERVICE_ID]
\b
# Print the redirector logs so far and exit
sky serve logs --redirector --no-follow [SERVICE_ID]
Expand All @@ -4325,22 +4344,19 @@ def serve_logs(
sky serve logs [SERVICE_ID] 1
"""
have_replica_id = replica_id is not None
if (control_plane + redirector + have_replica_id) != 1:
click.secho(
'Only one of --control-plane, --redirector, --replica-id '
'can be specified. See `sky serve logs --help` for more '
'information.',
fg='red')
return
if (controller + redirector + have_replica_id) != 1:
raise click.UsageError(
'One and only one of --controller, --redirector, '
'[REPLICA_ID] can be specified.')
service_record = global_user_state.get_service_from_name(service_name)
if service_record is None:
click.secho(f'Service {service_name!r} not found.', fg='red')
return
controller_name = service_record['controller_cluster_name']
if control_plane:
core.tail_logs(controller_name, job_id=1, follow=follow)
controller_cluster_name = service_record['handle'].controller_cluster_name
if controller:
core.tail_logs(controller_cluster_name, job_id=1, follow=follow)
elif redirector:
core.tail_logs(controller_name, job_id=2, follow=follow)
core.tail_logs(controller_cluster_name, job_id=2, follow=follow)
else:
core.serve_tail_logs(service_record, replica_id, follow=follow)

Expand Down
2 changes: 1 addition & 1 deletion sky/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def serve_tail_logs(service_record: Dict[str, Any], replica_id: int,
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Service {service_name!r}\'s controller failed. '
'Cannot tail logs.')
controller_cluster_name = service_record['controller_cluster_name']
controller_cluster_name = service_record['handle'].controller_cluster_name
handle = global_user_state.get_handle_from_cluster_name(
controller_cluster_name)
if handle is None:
Expand Down
Loading

0 comments on commit d6bd068

Please sign in to comment.