Skip to content

Commit

Permalink
[SkyServe] sky serve CLI prototype (#2276)
Browse files Browse the repository at this point in the history
* Add service schema

* use new serve YAML

* change to qpm

* change to fix node

* refactor init of SkyServiceSpec

* change http example to new yaml format

* update default value of from_yaml_config and handle service in task

* Launching successfully

* use argument in controller & redirector

* resolve comments

* use qps instead

* raise when multiple task found

* change to qps

* introduce constants

* introduce constants & fix bugs

* add sky down

* add Services
No existing services. without STATUS (but with #healthy replica

* format

* add llama2 example

* add fields to service db

* status with replica information

* fix policy parsing bug

* add auth todo

* add replica status todo

* change cluster name prefix and order of the column

* minor fixes

* reorder status

* change name: controller --> control plane

* change name: middleware --> controller

* clean code

* rename default service name

* env vars

* add purge and skip identity check on serve controller

* upload filemounts and workdir to storage & enhance --purge
  • Loading branch information
cblmemo authored Jul 26, 2023
1 parent f8c53ac commit 18ba98f
Show file tree
Hide file tree
Showing 26 changed files with 1,196 additions and 128 deletions.
4 changes: 3 additions & 1 deletion sky/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from sky import clouds
from sky.clouds.service_catalog import list_accelerators
from sky.dag import Dag
from sky.execution import launch, exec, spot_launch # pylint: disable=redefined-builtin
from sky.execution import launch, exec, spot_launch, serve_up, serve_down # pylint: disable=redefined-builtin
from sky.resources import Resources
from sky.task import Task
from sky.optimizer import Optimizer, OptimizeTarget
Expand Down Expand Up @@ -64,6 +64,8 @@
'launch',
'exec',
'spot_launch',
'serve_up',
'serve_down',
# core APIs
'status',
'start',
Expand Down
58 changes: 58 additions & 0 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Util constants/functions for the backends."""
import base64
from datetime import datetime
import difflib
import enum
import getpass
import json
import os
import pathlib
import pickle
import re
import subprocess
import tempfile
Expand Down Expand Up @@ -37,6 +39,7 @@
from sky import skypilot_config
from sky import sky_logging
from sky import spot as spot_lib
from sky import serve as serve_lib
from sky import status_lib
from sky.backends import onprem_utils
from sky.skylet import constants
Expand Down Expand Up @@ -1326,6 +1329,10 @@ def generate_cluster_name():
return f'sky-{uuid.uuid4().hex[:4]}-{get_cleaned_username()}'


def generate_service_name():
return f'service-{uuid.uuid4().hex[:4]}'


def get_cleaned_username() -> str:
"""Cleans the current username to be used as part of a cluster name.
Expand Down Expand Up @@ -2408,6 +2415,57 @@ def _refresh_cluster(cluster_name):
return kept_records


def refresh_service_status(service: Optional[str]) -> List[Dict[str, Any]]:
if service is None:
service_records = global_user_state.get_services()
else:
service_record = global_user_state.get_service_from_name(service)
if service_record is None:
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Service {service} does not exist.')
service_records = [service_record]
# TODO(tian): Make it run in parallel.
for record in service_records:
controller_cluster_name = record['controller_cluster_name']
endpoint = record['endpoint']
if not endpoint:
continue
# TODO(tian): Refactor: store ip and app_port separately.
controller_ip = endpoint.split(':')[0]
with requests.Session() as session:
try:
resp = session.get(
f'http://{controller_ip}:{serve_lib.CONTROL_PLANE_PORT}/control_plane/get_replica_nums',
timeout=5)
except requests.RequestException:
pass
else:
record.update(resp.json())
if record['num_healthy_replicas'] > 0:
record['status'] = status_lib.ServiceStatus.RUNNING
elif record['num_unhealthy_replicas'] > 0:
record['status'] = status_lib.ServiceStatus.REPLICA_INIT
global_user_state.add_or_update_service(**record)
if service is not None:
assert record['name'] == service
try:
resp = session.get(
f'http://{controller_ip}:{serve_lib.CONTROL_PLANE_PORT}/control_plane/get_replica_info',
timeout=5)
except requests.RequestException:
pass
else:
record['replica_info'] = resp.json()['replica_info']
decoded_info = []
for info in record['replica_info']:
decoded_info.append({
k: pickle.loads(base64.b64decode(v))
for k, v in info.items()
})
record['replica_info'] = decoded_info
return service_records


@typing.overload
def get_backend_from_handle(
handle: 'cloud_vm_ray_backend.CloudVmRayResourceHandle'
Expand Down
13 changes: 9 additions & 4 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from sky import optimizer
from sky import skypilot_config
from sky import spot as spot_lib
from sky import serve as serve_lib
from sky import status_lib
from sky import task as task_lib
from sky.data import data_utils
Expand Down Expand Up @@ -2892,8 +2893,9 @@ def _exec_code_on_head(
f'Failed to submit job {job_id}.',
stderr=stdout + stderr)

logger.info('Job submitted with Job ID: '
f'{style.BRIGHT}{job_id}{style.RESET_ALL}')
if not handle.cluster_name.startswith(serve_lib.CONTROLLER_PREFIX):
logger.info('Job submitted with Job ID: '
f'{style.BRIGHT}{job_id}{style.RESET_ALL}')

try:
if not detach_run:
Expand Down Expand Up @@ -2924,7 +2926,9 @@ def _exec_code_on_head(
'\nTo view the spot job dashboard:\t'
f'{backend_utils.BOLD}sky spot dashboard'
f'{backend_utils.RESET_BOLD}')
else:
elif not name.startswith(serve_lib.CONTROLLER_PREFIX):
# Skip logging for submit control plane & redirector jobs
# to controller
logger.info(f'{fore.CYAN}Job ID: '
f'{style.BRIGHT}{job_id}{style.RESET_ALL}'
'\nTo cancel the job:\t'
Expand Down Expand Up @@ -3039,7 +3043,8 @@ def _post_execute(self, handle: CloudVmRayResourceHandle,
fore = colorama.Fore
style = colorama.Style
name = handle.cluster_name
if name == spot_lib.SPOT_CONTROLLER_NAME or down:
if (name == spot_lib.SPOT_CONTROLLER_NAME or down or
name.startswith(serve_lib.CONTROLLER_PREFIX)):
return
stop_str = ('\nTo stop the cluster:'
f'\t{backend_utils.BOLD}sky stop {name}'
Expand Down
187 changes: 187 additions & 0 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,13 @@ def _complete_cluster_name(ctx: click.Context, param: click.Parameter,
return global_user_state.get_cluster_names_start_with(incomplete)


def _complete_service_name(ctx: click.Context, param: click.Parameter,
incomplete: str) -> List[str]:
"""Handle shell completion for service names."""
del ctx, param # Unused.
return global_user_state.get_service_names_start_with(incomplete)


def _complete_storage_name(ctx: click.Context, param: click.Parameter,
incomplete: str) -> List[str]:
"""Handle shell completion for storage names."""
Expand Down Expand Up @@ -3798,6 +3805,186 @@ def spot_dashboard(port: Optional[int]):
click.echo('Exiting.')


@cli.group(cls=_NaturalOrderGroup)
def serve():
"""SkyServe commands CLI."""
pass


@serve.command('up', cls=_DocumentedCodeCommand)
@click.argument('entrypoint',
required=True,
type=str,
**_get_shell_complete_args(_complete_file_name))
@click.option('--service',
'-s',
default=None,
type=str,
help='A service name. Unique for each service. If not provided, '
'provision a new service with an autogenerated name.')
@click.option('--yes',
'-y',
is_flag=True,
default=False,
required=False,
help='Skip confirmation prompt.')
def serve_up(
entrypoint: str,
service: Optional[str],
yes: bool,
):
"""Launches a SkyServe instance.
ENTRYPOINT must points to a valid YAML file.
Example:
.. code-block:: bash
sky serve up service.yaml
"""
if service is None:
# TODO(tian): Check service name is unique.
service = backend_utils.generate_service_name()

shell_splits = shlex.split(entrypoint)
yaml_file_provided = (len(shell_splits) == 1 and
(shell_splits[0].endswith('yaml') or
shell_splits[0].endswith('.yml')))
if not yaml_file_provided:
click.secho('ENTRYPOINT must points to a valid YAML file.', fg='red')
return

is_yaml = True
config: Optional[List[Dict[str, Any]]] = None
try:
with open(entrypoint, 'r') as f:
try:
config = list(yaml.safe_load_all(f))
if config:
# FIXME(zongheng): in a chain DAG YAML it only returns the
# first section. OK for downstream but is weird.
result = config[0]
else:
result = {}
if isinstance(result, str):
invalid_reason = (
'cannot be parsed into a valid YAML file. '
'Please check syntax.')
is_yaml = False
except yaml.YAMLError as e:
if yaml_file_provided:
logger.debug(e)
invalid_reason = ('contains an invalid configuration. '
' Please check syntax.')
is_yaml = False
except OSError:
entry_point_path = os.path.expanduser(entrypoint)
if not os.path.exists(entry_point_path):
invalid_reason = ('does not exist. Please check if the path'
' is correct.')
elif not os.path.isfile(entry_point_path):
invalid_reason = ('is not a file. Please check if the path'
' is correct.')
else:
invalid_reason = ('yaml.safe_load() failed. Please check if the'
' path is correct.')
is_yaml = False
if not is_yaml:
click.secho(
f'{entrypoint!r} looks like a yaml path but {invalid_reason}',
fg='red')
return

click.secho('Service from YAML spec: ', fg='yellow', nl=False)
click.secho(entrypoint, bold=True)
usage_lib.messages.usage.update_user_task_yaml(entrypoint)
dag = dag_utils.load_chain_dag_from_yaml(entrypoint)
if len(dag.tasks) > 1:
click.secho('Multiple tasks found in the YAML file.', fg='red')
return
task = dag.tasks[0]
if task.service is None:
click.secho('Service section not found in the YAML file.', fg='red')
return

if not yes:
prompt = f'Launching a new service {service}. Proceed?'
if prompt is not None:
click.confirm(prompt, default=True, abort=True, show_default=True)

sky.serve_up(task, service, entrypoint)


@serve.command('status', cls=_DocumentedCodeCommand)
@click.option('--all',
'-a',
default=False,
is_flag=True,
required=False,
help='Show all information in full.')
@click.argument('service',
required=False,
type=str,
**_get_shell_complete_args(_complete_service_name))
@usage_lib.entrypoint
# pylint: disable=redefined-builtin
def serve_status(all: bool, service: Optional[str]):
service_records = core.service_status(service)
click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}Services'
f'{colorama.Style.RESET_ALL}')
status_utils.show_service_table(service_records, all)
if service is not None:
# If service not exist, we should already raise an error in
# core.service_status.
assert len(service_records) == 1, service_records
service_record = service_records[0]
if 'replica_info' not in service_record:
click.secho(f'Failed to refresh status of service: {service}.',
fg='red')
return
click.echo(
f'\n{colorama.Fore.CYAN}{colorama.Style.BRIGHT}Replicas of {service}'
f'{colorama.Style.RESET_ALL}')
status_utils.show_replica_table(service_record['replica_info'], all)


@serve.command('down', cls=_DocumentedCodeCommand)
@click.argument('service',
required=True,
**_get_shell_complete_args(_complete_service_name))
@click.option('--yes',
'-y',
is_flag=True,
default=False,
required=False,
help='Skip confirmation prompt.')
@click.option('--purge',
'-p',
is_flag=True,
default=False,
required=False,
help='Ignore errors (if any). ')
def serve_down(
service: str,
yes: bool,
purge: bool,
):
"""Stops a SkyServe instance.
Example:
.. code-block:: bash
sky serve down my-service
"""
if not yes:
prompt = f'Tearing down service {service}. Proceed?'
click.confirm(prompt, default=True, abort=True, show_default=True)

sky.serve_down(service, purge)


# ==============================
# Sky Benchmark CLIs
# ==============================
Expand Down
5 changes: 5 additions & 0 deletions sky/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ def status(cluster_names: Optional[Union[str, List[str]]] = None,
cluster_names=cluster_names)


@usage_lib.entrypoint
def service_status(service: Optional[str]) -> List[Dict[str, Any]]:
return backend_utils.refresh_service_status(service)


@usage_lib.entrypoint
def cost_report() -> List[Dict[str, Any]]:
# NOTE(dev): Keep the docstring consistent between the Python API and CLI.
Expand Down
Loading

0 comments on commit 18ba98f

Please sign in to comment.