diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index ae0a28b52ed..c7c31960482 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -11,4 +11,4 @@ Tested (run the relevant ones): - [ ] Any manual or new tests for this PR (please specify below) - [ ] All smoke tests: `pytest tests/test_smoke.py` - [ ] Relevant individual smoke tests: `pytest tests/test_smoke.py::test_fill_in_the_name` -- [ ] Backward compatibility tests: `bash tests/backward_comaptibility_tests.sh` +- [ ] Backward compatibility tests: `conda deactivate; bash -i tests/backward_compatibility_tests.sh` diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 87d8cea9f16..bf84bea4d50 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -27,7 +27,7 @@ jobs: - tests/test_optimizer_random_dag.py - tests/test_storage.py - tests/test_wheels.py - - tests/test_spot_serve.py + - tests/test_jobs_and_serve.py - tests/test_yaml_parser.py runs-on: ubuntu-latest steps: diff --git a/docs/source/_static/custom.js b/docs/source/_static/custom.js index 00683c82f6b..11affaf4c43 100644 --- a/docs/source/_static/custom.js +++ b/docs/source/_static/custom.js @@ -26,6 +26,7 @@ document.addEventListener('DOMContentLoaded', () => { // New items: const newItems = [ { selector: '.caption-text', text: 'SkyServe: Model Serving' }, + { selector: '.toctree-l1 > a', text: 'Managed Jobs' }, { selector: '.toctree-l1 > a', text: 'Running on Kubernetes' }, { selector: '.toctree-l1 > a', text: 'DBRX (Databricks)' }, { selector: '.toctree-l1 > a', text: 'Ollama' }, diff --git a/docs/source/docs/index.rst b/docs/source/docs/index.rst index 412e3284372..4892e869d3c 100644 --- a/docs/source/docs/index.rst +++ b/docs/source/docs/index.rst @@ -121,7 +121,7 @@ Contents :maxdepth: 1 :caption: Running Jobs - ../examples/spot-jobs + ../examples/managed-jobs ../reference/job-queue ../examples/auto-failover ../reference/kubernetes/index @@ -139,7 +139,7 @@ Contents :maxdepth: 1 :caption: Cutting Cloud Costs - ../examples/spot-jobs + Managed Spot Jobs <../examples/spot-jobs> ../reference/auto-stop ../reference/benchmark/index diff --git a/docs/source/examples/managed-jobs.rst b/docs/source/examples/managed-jobs.rst new file mode 100644 index 00000000000..ba449c1f087 --- /dev/null +++ b/docs/source/examples/managed-jobs.rst @@ -0,0 +1,465 @@ +.. _managed-jobs: + +Managed Jobs +============ + +.. tip:: + + This feature is great for scaling out: running a single job for long durations, or running many jobs (pipelines). + +SkyPilot supports **managed jobs**, which can automatically recover from any spot preemptions or hardware failures. +It can be used in three modes: + +#. :ref:`Managed Spot Jobs `: Jobs run on auto-recovering spot instances. This can **save significant costs** (e.g., up to 70\% for GPU VMs) by making preemptible spot instances useful for long-running jobs. +#. :ref:`On-demand `: Jobs run on auto-recovering on-demand instances. This is useful for jobs that require guaranteed resources. +#. :ref:`Pipelines `: Run pipelines that contain multiple tasks (which can have different resource requirements and ``setup``/``run`` commands). This is useful for running a sequence of tasks that depend on each other, e.g., data processing, training a model, and then running inference on it. + + +.. _spot-jobs: + +Managed Spot Jobs +----------------- + +SkyPilot automatically finds available spot resources across regions and clouds to maximize availability. +Any spot preemptions are automatically handled by SkyPilot without user intervention. + +Here is an example of a BERT training job failing over different regions across AWS and GCP. + +.. image:: https://i.imgur.com/Vteg3fK.gif + :width: 600 + :alt: GIF for BERT training on Spot V100 + +.. image:: ../images/spot-training.png + :width: 600 + :alt: Static plot, BERT training on Spot V100 + +To use managed spot jobs, there are two requirements: + +#. :ref:`Job YAML `: Managed Spot requires a YAML to describe the job, tested with :code:`sky launch`. +#. :ref:`Checkpointing ` (optional): For job recovery due to preemptions, the user application code can checkpoint its progress periodically to a :ref:`mounted cloud bucket `. The program can reload the latest checkpoint when restarted. + + +.. _job-yaml: + +Job YAML +~~~~~~~~ + +To launch a managed job, you can simply reuse your job YAML (recommended to test it with :code:`sky launch` first). +For example, we found the BERT fine-tuning YAML works with :code:`sky launch`, and want to +launch it with SkyPilot managed spot jobs. + +We can launch it with the following: + +.. code-block:: console + + $ sky jobs launch -n bert-qa bert_qa.yaml + + +.. code-block:: yaml + + # bert_qa.yaml + name: bert-qa + + resources: + accelerators: V100:1 + # Use spot instances to save cost. + use_spot: true + + # Assume your working directory is under `~/transformers`. + # To make this example work, please run the following command: + # git clone https://github.com/huggingface/transformers.git ~/transformers -b v4.30.1 + workdir: ~/transformers + + setup: | + # Fill in your wandb key: copy from https://wandb.ai/authorize + # Alternatively, you can use `--env WANDB_API_KEY=$WANDB_API_KEY` + # to pass the key in the command line, during `sky spot launch`. + echo export WANDB_API_KEY=[YOUR-WANDB-API-KEY] >> ~/.bashrc + + pip install -e . + cd examples/pytorch/question-answering/ + pip install -r requirements.txt torch==1.12.1+cu113 --extra-index-url https://download.pytorch.org/whl/cu113 + pip install wandb + + run: | + cd ./examples/pytorch/question-answering/ + python run_qa.py \ + --model_name_or_path bert-base-uncased \ + --dataset_name squad \ + --do_train \ + --do_eval \ + --per_device_train_batch_size 12 \ + --learning_rate 3e-5 \ + --num_train_epochs 50 \ + --max_seq_length 384 \ + --doc_stride 128 \ + --report_to wandb + + +.. note:: + + :ref:`workdir ` and :ref:`file mounts with local files ` will be automatically uploaded to a + :ref:`cloud bucket `. The bucket will be created during the job running time, and cleaned up after the job + finishes. + +SkyPilot will launch and start monitoring the job. When a spot preemption or any machine failure happens, SkyPilot will automatically +search for resources across regions and clouds to re-launch the job. + +In this example, the job will be restarted from scratch after each preemption recovery. +To resume the job from previous states, user's application needs to implement checkpointing and recovery. + + +.. _checkpointing: + +Checkpointing and Recovery +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To allow job recovery, a cloud bucket is typically needed to store the job's states (e.g., model checkpoints). +Below is an example of mounting a bucket to :code:`/checkpoint`. + +.. code-block:: yaml + + file_mounts: + /checkpoint: + name: # NOTE: Fill in your bucket name + mode: MOUNT + +The :code:`MOUNT` mode in :ref:`SkyPilot bucket mounting ` ensures the checkpoints outputted to :code:`/checkpoint` are automatically synced to a persistent bucket. +Note that the application code should save program checkpoints periodically and reload those states when the job is restarted. +This is typically achieved by reloading the latest checkpoint at the beginning of your program. + +.. _spot-jobs-end-to-end: + +An End-to-End Example +~~~~~~~~~~~~~~~~~~~~~ + +Below we show an `example `_ for fine-tuning a BERT model on a question-answering task with HuggingFace. + +.. code-block:: yaml + :emphasize-lines: 13-16,42-45 + + # bert_qa.yaml + name: bert-qa + + resources: + accelerators: V100:1 + use_spot: true + + # Assume your working directory is under `~/transformers`. + # To make this example work, please run the following command: + # git clone https://github.com/huggingface/transformers.git ~/transformers -b v4.30.1 + workdir: ~/transformers + + file_mounts: + /checkpoint: + name: # NOTE: Fill in your bucket name + mode: MOUNT + + setup: | + # Fill in your wandb key: copy from https://wandb.ai/authorize + # Alternatively, you can use `--env WANDB_API_KEY=$WANDB_API_KEY` + # to pass the key in the command line, during `sky jobs launch`. + echo export WANDB_API_KEY=[YOUR-WANDB-API-KEY] >> ~/.bashrc + + pip install -e . + cd examples/pytorch/question-answering/ + pip install -r requirements.txt + pip install wandb + + run: | + cd ./examples/pytorch/question-answering/ + python run_qa.py \ + --model_name_or_path bert-base-uncased \ + --dataset_name squad \ + --do_train \ + --do_eval \ + --per_device_train_batch_size 12 \ + --learning_rate 3e-5 \ + --num_train_epochs 50 \ + --max_seq_length 384 \ + --doc_stride 128 \ + --report_to wandb \ + --run_name $SKYPILOT_TASK_ID \ + --output_dir /checkpoint/bert_qa/ \ + --save_total_limit 10 \ + --save_steps 1000 + + + +As HuggingFace has built-in support for periodically checkpointing, we only need to pass the highlighted arguments for setting up +the output directory and frequency of checkpointing (see more +on `Huggingface API `_). +You may also refer to another example `here `__ for periodically checkpointing with PyTorch. + +We also set :code:`--run_name` to :code:`$SKYPILOT_TASK_ID` so that the logs for all recoveries of the same job will be saved +to the same run in Weights & Biases. + +.. note:: + The environment variable :code:`$SKYPILOT_TASK_ID` (example: "sky-managed-2022-10-06-05-17-09-750781_bert-qa_8-0") can be used to identify the same job, i.e., it is kept identical across all + recoveries of the job. + It can be accessed in the task's :code:`run` commands or directly in the program itself (e.g., access + via :code:`os.environ` and pass to Weights & Biases for tracking purposes in your training script). It is made available to + the task whenever it is invoked. + +With the highlighted changes, the managed spot job can now resume training after preemption! We can enjoy the benefits of +cost savings from spot instances without worrying about preemption or losing progress. + +.. code-block:: console + + $ sky jobs launch -n bert-qa bert_qa.yaml + +.. tip:: + + Try copy-paste this example and adapt it to your own job. + + + +Real-World Examples +~~~~~~~~~~~~~~~~~~~ + +* `Vicuna `_ LLM chatbot: `instructions `_, `YAML `__ +* BERT (shown above): `YAML `__ +* PyTorch DDP, ResNet: `YAML `__ +* PyTorch Lightning DDP, CIFAR-10: `YAML `__ + + +.. _on-demand: + +Using On-Demand Instances +-------------------------------- + +The same ``sky jobs launch`` and YAML interfaces can run jobs on auto-recovering +on-demand instances. This is useful to have SkyPilot monitor any underlying +machine failures and transparently recover the job. + +To do so, simply set :code:`use_spot: false` in the :code:`resources` section, or override it with :code:`--use-spot false` in the CLI. + +.. code-block:: console + + $ sky jobs launch -n bert-qa bert_qa.yaml --use-spot false + +.. tip:: + + It is useful to think of ``sky jobs launch`` as a "serverless" managed job + interface, while ``sky launch`` is a cluster interface (that you can launch + tasks on, albeit not managed). + +Either Spot Or On-Demand +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +You can use ``any_of`` to specify either spot or on-demand instances as +candidate resources for a job. See documentation :ref:`here +` for more details. + +.. code-block:: yaml + + resources: + accelerators: A100:8 + any_of: + - use_spot: true + - use_spot: false + +In this example, SkyPilot will perform cost optimizations to select the resource to use, which almost certainly +will be spot instances. If spot instances are not available, SkyPilot will fall back to launch on-demand instances. + +More advanced policies for resource selection, such as the `Can't Be Late +`__ (NSDI'24) +paper, may be supported in the future. + +Useful CLIs +----------- + +Here are some commands for managed jobs. Check :code:`sky jobs --help` and :ref:`CLI reference ` for more details. + +See all managed jobs: + +.. code-block:: console + + $ sky jobs queue + +.. code-block:: console + + Fetching managed job statuses... + Managed jobs: + ID NAME RESOURCES SUBMITTED TOT. DURATION JOB DURATION #RECOVERIES STATUS + 2 roberta 1x [A100:8][Spot] 2 hrs ago 2h 47m 18s 2h 36m 18s 0 RUNNING + 1 bert-qa 1x [V100:1][Spot] 4 hrs ago 4h 24m 26s 4h 17m 54s 0 RUNNING + +Stream the logs of a running managed job: + +.. code-block:: console + + $ sky jobs logs -n bert-qa # by name + $ sky jobs logs 2 # by job ID + +Cancel a managed job: + +.. code-block:: console + + $ sky jobs cancel -n bert-qa # by name + $ sky jobs cancel 2 # by job ID + +.. note:: + If any failure happens for a managed job, you can check :code:`sky jobs queue -a` for the brief reason + of the failure. For more details, it would be helpful to check :code:`sky jobs logs --controller `. + + +.. _pipeline: + +Job Pipelines +------------- + +A pipeline is a managed job that contains a sequence of tasks running one after another. + +This is useful for running a sequence of tasks that depend on each other, e.g., training a model and then running inference on it. +Different tasks can have different resource requirements to use appropriate per-task resources, which saves costs, while keeping the burden of managing the tasks off the user. + +.. note:: + In other words, a managed job is either a single task or a pipeline of tasks. All managed jobs are submitted by :code:`sky jobs launch`. + +To run a pipeline, specify the sequence of tasks in a YAML file. Here is an example: + +.. code-block:: yaml + + name: pipeline + + --- + + name: train + + resources: + accelerators: V100:8 + any_of: + - use_spot: true + - use_spot: false + + file_mounts: + /checkpoint: + name: train-eval # NOTE: Fill in your bucket name + mode: MOUNT + + setup: | + echo setup for training + + run: | + echo run for training + echo save checkpoints to /checkpoint + + --- + + name: eval + + resources: + accelerators: T4:1 + use_spot: false + + file_mounts: + /checkpoint: + name: train-eval # NOTE: Fill in your bucket name + mode: MOUNT + + setup: | + echo setup for eval + + run: | + echo load trained model from /checkpoint + echo eval model on test set + + +The YAML above defines a pipeline with two tasks. The first :code:`name: +pipeline` names the pipeline. The first task has name :code:`train` and the +second task has name :code:`eval`. The tasks are separated by a line with three +dashes :code:`---`. Each task has its own :code:`resources`, :code:`setup`, and +:code:`run` sections. Tasks are executed sequentially. + +To submit the pipeline, the same command :code:`sky jobs launch` is used. The pipeline will be automatically launched and monitored by SkyPilot. You can check the status of the pipeline with :code:`sky jobs queue` or :code:`sky jobs dashboard`. + +.. code-block:: console + + $ sky jobs launch -n pipeline pipeline.yaml + $ sky jobs queue + Fetching managed job statuses... + Managed jobs + In progress jobs: 1 RECOVERING + ID TASK NAME RESOURCES SUBMITTED TOT. DURATION JOB DURATION #RECOVERIES STATUS + 8 pipeline - 50 mins ago 47m 45s - 1 RECOVERING + ↳ 0 train 1x [V100:8][Spot|On-demand] 50 mins ago 47m 45s - 1 RECOVERING + ↳ 1 eval 1x [T4:1] - - - 0 PENDING + +.. note:: + + The :code:`$SKYPILOT_TASK_ID` environment variable is also available in the :code:`run` section of each task. It is unique for each task in the pipeline. + For example, the :code:`$SKYPILOT_TASK_ID` for the :code:`eval` task above is: + "sky-managed-2022-10-06-05-17-09-750781_pipeline_eval_8-1". + + + +Dashboard +--------- + +Use ``sky jobs dashboard`` to open a dashboard to see all jobs: + +.. code-block:: console + + $ sky jobs dashboard + +This automatically opens a browser tab to show the dashboard: + +.. image:: ../images/job-dashboard.png + +The UI shows the same information as the CLI ``sky jobs queue -a``. The UI is +especially useful when there are many in-progress jobs to monitor, which the +terminal-based CLI may need more than one page to display. + + +Concept: Jobs Controller +------------------------ + +The jobs controller is a small on-demand CPU VM running in the cloud that manages all jobs of a user. +It is automatically launched when the first managed job is submitted, and it is autostopped after it has been idle for 10 minutes (i.e., after all managed jobs finish and no new managed job is submitted in that duration). +Thus, **no user action is needed** to manage its lifecycle. + +You can see the controller with :code:`sky status` and refresh its status by using the :code:`-r/--refresh` flag. + +While the cost of the jobs controller is negligible (~$0.4/hour when running and less than $0.004/hour when stopped), +you can still tear it down manually with +:code:`sky down `, where the ```` can be found in the output of :code:`sky status`. + +.. note:: + Tearing down the jobs controller loses all logs and status information for the finished managed jobs. It is only allowed when there are no in-progress managed jobs to ensure no resource leakage. + +Customizing Job Controller Resources +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +You may want to customize the resources of the jobs controller for several reasons: + +#. Changing the maximum number of jobs that can be run concurrently, which is 2x the vCPUs of the controller. (Default: 16) +#. Use a lower-cost controller (if you have a low number of concurrent managed jobs). +#. Enforcing the jobs controller to run on a specific location. (Default: cheapest location) +#. Changing the disk_size of the jobs controller to store more logs. (Default: 50GB) + +To achieve the above, you can specify custom configs in :code:`~/.sky/config.yaml` with the following fields: + +.. code-block:: yaml + + jobs: + # NOTE: these settings only take effect for a new jobs controller, not if + # you have an existing one. + controller: + resources: + # All configs below are optional. + # Specify the location of the jobs controller. + cloud: gcp + region: us-central1 + # Specify the maximum number of managed jobs that can be run concurrently. + cpus: 4+ # number of vCPUs, max concurrent jobs = 2 * cpus + # Specify the disk_size in GB of the jobs controller. + disk_size: 100 + +The :code:`resources` field has the same spec as a normal SkyPilot job; see `here `__. + +.. note:: + These settings will not take effect if you have an existing controller (either + stopped or live). For them to take effect, tear down the existing controller + first, which requires all in-progress jobs to finish or be canceled. + diff --git a/docs/source/examples/spot-jobs.rst b/docs/source/examples/spot-jobs.rst index 5940e404bb3..2b3df600425 100644 --- a/docs/source/examples/spot-jobs.rst +++ b/docs/source/examples/spot-jobs.rst @@ -1,389 +1,23 @@ -.. _spot-jobs: - Managed Spot Jobs -================================================ - -.. tip:: - - This feature is great for scaling out: running a single job for long durations, or running many jobs. - -SkyPilot supports managed spot jobs that can **automatically recover from preemptions**. -This feature **saves significant cost** (e.g., up to 70\% for GPU VMs) by making preemptible spot instances practical for long-running jobs. - -SkyPilot automatically finds available spot resources across regions and clouds to maximize availability. -Here is an example of a BERT training job failing over different regions across AWS and GCP. - -.. image:: https://i.imgur.com/Vteg3fK.gif - :width: 600 - :alt: GIF for BERT training on Spot V100 - -.. image:: ../images/spot-training.png - :width: 600 - :alt: Static plot, BERT training on Spot V100 - -To use managed spot jobs, there are two requirements: - -#. **Task YAML**: Managed Spot requires a YAML to describe the job, tested with :code:`sky launch`. -#. **Checkpointing** (optional): For job recovery due to preemptions, the user application code can checkpoint its progress periodically to a :ref:`mounted cloud bucket `. The program can reload the latest checkpoint when restarted. - - -Task YAML ---------- - -To launch a spot job, you can simply reuse your task YAML (recommended to test it with :code:`sky launch` first). -For example, we found the BERT fine-tuning YAML works with :code:`sky launch`, and want to -launch it with SkyPilot managed spot jobs. - -We can launch it with the following: - -.. code-block:: console - - $ sky spot launch -n bert-qa bert_qa.yaml - - -.. code-block:: yaml - - # bert_qa.yaml - name: bert-qa - - resources: - accelerators: V100:1 - - # Assume your working directory is under `~/transformers`. - # To make this example work, please run the following command: - # git clone https://github.com/huggingface/transformers.git ~/transformers -b v4.30.1 - workdir: ~/transformers - - setup: | - # Fill in your wandb key: copy from https://wandb.ai/authorize - # Alternatively, you can use `--env WANDB_API_KEY=$WANDB_API_KEY` - # to pass the key in the command line, during `sky spot launch`. - echo export WANDB_API_KEY=[YOUR-WANDB-API-KEY] >> ~/.bashrc - - pip install -e . - cd examples/pytorch/question-answering/ - pip install -r requirements.txt torch==1.12.1+cu113 --extra-index-url https://download.pytorch.org/whl/cu113 - pip install wandb - - run: | - cd ./examples/pytorch/question-answering/ - python run_qa.py \ - --model_name_or_path bert-base-uncased \ - --dataset_name squad \ - --do_train \ - --do_eval \ - --per_device_train_batch_size 12 \ - --learning_rate 3e-5 \ - --num_train_epochs 50 \ - --max_seq_length 384 \ - --doc_stride 128 \ - --report_to wandb - - -.. note:: - - :ref:`workdir ` and :ref:`file mounts with local files ` will be automatically uploaded to a - :ref:`cloud bucket `. The bucket will be created during the job running time, and cleaned up after the job - finishes. - -SkyPilot will launch and start monitoring the spot job. When a preemption happens, SkyPilot will automatically -search for resources across regions and clouds to re-launch the job. - -In this example, the job will be restarted from scratch after each preemption recovery. -To resume the job from previous states, user's application needs to implement checkpointing and recovery. - - -Checkpointing and recovery --------------------------- - -To allow spot recovery, a cloud bucket is typically needed to store the job's states (e.g., model checkpoints). -Below is an example of mounting a bucket to :code:`/checkpoint`. - -.. code-block:: yaml - - file_mounts: - /checkpoint: - name: # NOTE: Fill in your bucket name - mode: MOUNT - -The :code:`MOUNT` mode in :ref:`SkyPilot bucket mounting ` ensures the checkpoints outputted to :code:`/checkpoint` are automatically synced to a persistent bucket. -Note that the application code should save program checkpoints periodically and reload those states when the job is restarted. -This is typically achieved by reloading the latest checkpoint at the beginning of your program. - -.. _spot-jobs-end-to-end: - -An end-to-end example ---------------------- - -Below we show an `example `_ for fine-tuning a BERT model on a question-answering task with HuggingFace. - -.. code-block:: yaml - :emphasize-lines: 12-15,41-44 - - # bert_qa.yaml - name: bert-qa - - resources: - accelerators: V100:1 - - # Assume your working directory is under `~/transformers`. - # To make this example work, please run the following command: - # git clone https://github.com/huggingface/transformers.git ~/transformers -b v4.30.1 - workdir: ~/transformers - - file_mounts: - /checkpoint: - name: # NOTE: Fill in your bucket name - mode: MOUNT - - setup: | - # Fill in your wandb key: copy from https://wandb.ai/authorize - # Alternatively, you can use `--env WANDB_API_KEY=$WANDB_API_KEY` - # to pass the key in the command line, during `sky spot launch`. - echo export WANDB_API_KEY=[YOUR-WANDB-API-KEY] >> ~/.bashrc - - pip install -e . - cd examples/pytorch/question-answering/ - pip install -r requirements.txt - pip install wandb - - run: | - cd ./examples/pytorch/question-answering/ - python run_qa.py \ - --model_name_or_path bert-base-uncased \ - --dataset_name squad \ - --do_train \ - --do_eval \ - --per_device_train_batch_size 12 \ - --learning_rate 3e-5 \ - --num_train_epochs 50 \ - --max_seq_length 384 \ - --doc_stride 128 \ - --report_to wandb \ - --run_name $SKYPILOT_TASK_ID \ - --output_dir /checkpoint/bert_qa/ \ - --save_total_limit 10 \ - --save_steps 1000 - - - -As HuggingFace has built-in support for periodically checkpointing, we only need to pass the highlighted arguments for setting up -the output directory and frequency of checkpointing (see more -on `Huggingface API `_). -You may also refer to another example `here `__ for periodically checkpointing with PyTorch. - -We also set :code:`--run_name` to :code:`$SKYPILOT_TASK_ID` so that the logs for all recoveries of the same job will be saved -to the same run in Weights & Biases. - -.. note:: - The environment variable :code:`$SKYPILOT_TASK_ID` (example: "sky-managed-2022-10-06-05-17-09-750781_pipeline_eval_8-1") can be used to identify the same job, i.e., it is kept identical across all - recoveries of the job. - It can be accessed in the task's :code:`run` commands or directly in the program itself (e.g., access - via :code:`os.environ` and pass to Weights & Biases for tracking purposes in your training script). It is made available to - the task whenever it is invoked. - -With the highlighted changes, the managed spot job can now resume training after preemption with ``sky spot launch``! We can enjoy the benefits of -cost savings from spot instances without worrying about preemption or losing progress. - -.. code-block:: console - - $ sky spot launch -n bert-qa bert_qa.yaml - -.. tip:: - - Try copy-paste this example and adapt it to your own job. - - -Useful CLIs ------------ - -Here are some commands for managed spot jobs. Check :code:`sky spot --help` for more details. - -See all spot jobs: - -.. code-block:: console - - $ sky spot queue - -.. code-block:: console - - Fetching managed spot job statuses... - Managed spot jobs: - ID NAME RESOURCES SUBMITTED TOT. DURATION JOB DURATION #RECOVERIES STATUS - 2 roberta 1x [A100:8] 2 hrs ago 2h 47m 18s 2h 36m 18s 0 RUNNING - 1 bert-qa 1x [V100:1] 4 hrs ago 4h 24m 26s 4h 17m 54s 0 RUNNING - -Stream the logs of a running spot job: - -.. code-block:: console - - $ sky spot logs -n bert-qa # by name - $ sky spot logs 2 # by job ID - -Cancel a spot job: - -.. code-block:: console - - $ sky spot cancel -n bert-qa # by name - $ sky spot cancel 2 # by job ID - -.. note:: - If any failure happens for a spot job, you can check :code:`sky spot queue -a` for the brief reason - of the failure. For more details, it would be helpful to check :code:`sky spot logs --controller `. - -Dashboard ------------ - -Use ``sky spot dashboard`` to open a dashboard to see all jobs: - -.. code-block:: console - - $ sky spot dashboard - -This automatically opens a browser tab to show the dashboard: - -.. image:: ../images/spot-dashboard.png - -The UI shows the same information as the CLI ``sky spot queue -a``. The UI is -especially useful when there are many in-progress jobs to monitor, which the -terminal-based CLI may need more than one page to display. - -Real-world examples -------------------------- - -* `Vicuna `_ LLM chatbot: `instructions `_, `YAML `__ -* BERT (shown above): `YAML `__ -* PyTorch DDP, ResNet: `YAML `__ -* PyTorch Lightning DDP, CIFAR-10: `YAML `__ - -Spot controller -------------------------------- - -The spot controller is a small on-demand CPU VM running in the cloud that manages all spot jobs of a user. -It is automatically launched when the first managed spot job is submitted, and it is autostopped after it has been idle for 10 minutes (i.e., after all spot jobs finish and no new spot job is submitted in that duration). -Thus, **no user action is needed** to manage its lifecycle. - -You can see the controller with :code:`sky status` and refresh its status by using the :code:`-r/--refresh` flag. - -While the cost of the spot controller is negligible (~$0.4/hour when running and less than $0.004/hour when stopped), -you can still tear it down manually with -:code:`sky down `, where the ```` can be found in the output of :code:`sky status`. - -.. note:: - Tearing down the spot controller loses all logs and status information for the finished spot jobs. It is only allowed when there are no in-progress spot jobs to ensure no resource leakage. - -Customizing spot controller resources -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -You may want to customize the resources of the spot controller for several reasons: - -1. Use a lower-cost controller (if you have a low number of concurrent spot jobs). -2. Enforcing the spot controller to run on a specific location. (Default: cheapest location) -3. Changing the maximum number of spot jobs that can be run concurrently, which is 2x the vCPUs of the controller. (Default: 16) -4. Changing the disk_size of the spot controller to store more logs. (Default: 50GB) - -To achieve the above, you can specify custom configs in :code:`~/.sky/config.yaml` with the following fields: - -.. code-block:: yaml - - spot: - # NOTE: these settings only take effect for a new spot controller, not if - # you have an existing one. - controller: - resources: - # All configs below are optional. - # Specify the location of the spot controller. - cloud: gcp - region: us-central1 - # Specify the maximum number of spot jobs that can be run concurrently. - cpus: 4+ # number of vCPUs, max concurrent spot jobs = 2 * cpus - # Specify the disk_size in GB of the spot controller. - disk_size: 100 - -The :code:`resources` field has the same spec as a normal SkyPilot job; see `here `__. - -.. note:: - These settings will not take effect if you have an existing controller (either - stopped or live). For them to take effect, tear down the existing controller - first, which requires all in-progress spot jobs to finish or be canceled. - - -Spot Pipeline -------------------------- - -Spot Pipeline is a feature that allows you to submit a spot job that contains a sequence of spot tasks running one after another. -This is useful for running a sequence of jobs that depend on each other, e.g., training a model and then running inference on it. -This allows the multiple tasks to have different resource requirements to fully utilize the resources and save cost, while keeping the burden of managing the tasks off the user. - -.. note:: - A spot job is either a single task or a pipeline of tasks. A spot job is submitted by :code:`sky spot launch`. - - All tasks in a pipeline will be run on spot instances. - -To use Spot Pipeline, you can specify the sequence of jobs in a YAML file. Here is an example: - -.. code-block:: yaml - - name: pipeline - - --- - - name: train - - resources: - accelerators: V100:8 - - file_mounts: - /checkpoint: - name: train-eval # NOTE: Fill in your bucket name - mode: MOUNT - - setup: | - echo setup for training - - run: | - echo run for training - echo save checkpoints to /checkpoint - - --- - - name: eval - - resources: - accelerators: T4:1 - - file_mounts: - /checkpoint: - name: train-eval # NOTE: Fill in your bucket name - mode: MOUNT - - setup: | - echo setup for eval - - run: | - echo load trained model from /checkpoint - echo eval model on test set - - -The above YAML file defines a pipeline with two tasks. The first :code:`name: pipeline` names the pipeline. The first task has name :code:`train` and the second task has name :code:`eval`. The tasks are separated by a line with three dashes :code:`---`. Each task has its own :code:`resources`, :code:`setup`, and :code:`run` sections. The :code:`setup` and :code:`run` sections are executed sequentially. - -To submit the pipeline, the same command :code:`sky spot launch` is used. The pipeline will be automatically launched and monitored by SkyPilot. You can check the status of the pipeline with :code:`sky spot queue` or :code:`sky spot dashboard`. - -.. note:: - - The :code:`$SKYPILOT_TASK_ID` environment variable is also available in the :code:`run` section of each task. It is unique for each task in the pipeline. - For example, the :code:`$SKYPILOT_TASK_ID` for the :code:`eval` task above is: - "sky-managed-2022-10-06-05-17-09-750781_pipeline_eval_8-1". - -.. code-block:: console - - $ sky spot launch -n pipeline pipeline.yaml - $ sky spot queue - Fetching managed spot job statuses... - Managed spot jobs - In progress tasks: 1 PENDING, 1 RECOVERING - ID TASK NAME RESOURCES SUBMITTED TOT. DURATION JOB DURATION #RECOVERIES STATUS - 8 pipeline - 50 mins ago 47m 45s - 1 RECOVERING - ↳ 0 train 1x [V100:8] 50 mins ago 47m 45s - 1 RECOVERING - ↳ 1 eval 1x [T4:1] - - - 0 PENDING - +================== + +.. raw:: html + + diff --git a/docs/source/images/job-dashboard.png b/docs/source/images/job-dashboard.png new file mode 100644 index 00000000000..6c25d7d83bc Binary files /dev/null and b/docs/source/images/job-dashboard.png differ diff --git a/docs/source/images/managed-jobs-arch.png b/docs/source/images/managed-jobs-arch.png new file mode 100644 index 00000000000..c78f0680331 Binary files /dev/null and b/docs/source/images/managed-jobs-arch.png differ diff --git a/docs/source/images/spot-controller.png b/docs/source/images/spot-controller.png deleted file mode 100644 index fce9a1d8cc2..00000000000 Binary files a/docs/source/images/spot-controller.png and /dev/null differ diff --git a/docs/source/images/spot-dashboard.png b/docs/source/images/spot-dashboard.png deleted file mode 100644 index 2b322418350..00000000000 Binary files a/docs/source/images/spot-dashboard.png and /dev/null differ diff --git a/docs/source/reference/cli.rst b/docs/source/reference/cli.rst index d4c45ce7b82..985f63482b6 100644 --- a/docs/source/reference/cli.rst +++ b/docs/source/reference/cli.rst @@ -3,8 +3,8 @@ Command Line Interface ====================== -Core CLI ---------- +Cluster CLI +----------- .. _sky-launch: .. click:: sky.cli:launch @@ -41,9 +41,6 @@ Core CLI :prog: sky autostop :nested: full -Job Queue CLI --------------- - .. _sky-queue: .. click:: sky.cli:queue :prog: sky queue @@ -59,7 +56,31 @@ Job Queue CLI :prog: sky cancel :nested: full -Sky Serve CLI +Managed (Spot) Jobs CLI +--------------------------- + +.. _sky-job-launch: +.. click:: sky.cli:jobs_launch + :prog: sky jobs launch + :nested: full + +.. _sky-job-queue: +.. click:: sky.cli:jobs_queue + :prog: sky jobs queue + :nested: full + +.. _sky-job-cancel: +.. click:: sky.cli:jobs_cancel + :prog: sky jobs cancel + :nested: full + +.. _sky-job-logs: +.. click:: sky.cli:jobs_logs + :prog: sky jobs logs + :nested: full + + +SkyServe CLI ------------- .. click:: sky.cli:serve_up @@ -82,28 +103,6 @@ Sky Serve CLI :prog: sky serve update :nested: full -Managed Spot Jobs CLI ---------------------------- - -.. _sky-spot-launch: -.. click:: sky.cli:spot_launch - :prog: sky spot launch - :nested: full - -.. _sky-spot-queue: -.. click:: sky.cli:spot_queue - :prog: sky spot queue - :nested: full - -.. _sky-spot-cancel: -.. click:: sky.cli:spot_cancel - :prog: sky spot cancel - :nested: full - -.. _sky-spot-logs: -.. click:: sky.cli:spot_logs - :prog: sky spot logs - :nested: full Storage CLI ------------ diff --git a/docs/source/reference/config.rst b/docs/source/reference/config.rst index 9bca0a796d7..1dfda834ee0 100644 --- a/docs/source/reference/config.rst +++ b/docs/source/reference/config.rst @@ -14,12 +14,12 @@ Available fields and semantics: .. code-block:: yaml - # Custom spot controller resources (optional). + # Custom managed jobs controller resources (optional). # - # These take effects only when a spot controller does not already exist. + # These take effects only when a managed jobs controller does not already exist. # - # Ref: https://skypilot.readthedocs.io/en/latest/examples/spot-jobs.html#customizing-spot-controller-resources - spot: + # Ref: https://skypilot.readthedocs.io/en/latest/examples/managed-jobs.html#customizing-job-controller-resources + jobs: controller: resources: # same spec as 'resources' in a task YAML cloud: gcp @@ -114,7 +114,7 @@ Available fields and semantics: # LOCAL_CREDENTIALS: The user's local credential files will be uploaded to # AWS instances created by SkyPilot. They are used for accessing cloud # resources (e.g., private buckets) or launching new instances (e.g., for - # spot/serve controllers). + # jobs/serve controllers). # # SERVICE_ACCOUNT: Local credential files are not uploaded to AWS # instances. SkyPilot will auto-create and reuse a service account (IAM @@ -125,8 +125,8 @@ Available fields and semantics: # - This only affects AWS instances. Local AWS credentials will still be # uploaded to non-AWS instances (since those instances may need to access # AWS resources). - # - If the SkyPilot spot/serve controller is on AWS, this setting will make - # non-AWS managed spot jobs / non-AWS service replicas fail to access any + # - If the SkyPilot jobs/serve controller is on AWS, this setting will make + # non-AWS managed jobs / non-AWS service replicas fail to access any # resources on AWS (since the controllers don't have AWS credential # files to assign to these non-AWS instances). # @@ -224,7 +224,7 @@ Available fields and semantics: # LOCAL_CREDENTIALS: The user's local credential files will be uploaded to # GCP instances created by SkyPilot. They are used for accessing cloud # resources (e.g., private buckets) or launching new instances (e.g., for - # spot/serve controllers). + # jobs/serve controllers). # # SERVICE_ACCOUNT: Local credential files are not uploaded to GCP # instances. SkyPilot will auto-create and reuse a service account for GCP @@ -235,8 +235,8 @@ Available fields and semantics: # - This only affects GCP instances. Local GCP credentials will still be # uploaded to non-GCP instances (since those instances may need to access # GCP resources). - # - If the SkyPilot spot/serve controller is on GCP, this setting will make - # non-GCP managed spot jobs / non-GCP service replicas fail to access any + # - If the SkyPilot jobs/serve controller is on GCP, this setting will make + # non-GCP managed jobs / non-GCP service replicas fail to access any # resources on GCP (since the controllers don't have GCP credential # files to assign to these non-GCP instances). # diff --git a/docs/source/reference/job-queue.rst b/docs/source/reference/job-queue.rst index 47be2012365..6397c7bbbb6 100644 --- a/docs/source/reference/job-queue.rst +++ b/docs/source/reference/job-queue.rst @@ -1,7 +1,7 @@ .. _job-queue: -Job Queue -========= +Cluster Job Queue +================= SkyPilot's **job queue** allows multiple jobs to be scheduled on a cluster. diff --git a/docs/source/reference/yaml-spec.rst b/docs/source/reference/yaml-spec.rst index 14b5d428d42..1e56240989c 100644 --- a/docs/source/reference/yaml-spec.rst +++ b/docs/source/reference/yaml-spec.rst @@ -92,10 +92,21 @@ Available fields: # If unspecified, defaults to False (on-demand instances). use_spot: False - # The recovery strategy for spot jobs (optional). - # `use_spot` must be True for this to have any effect. For now, only - # `FAILOVER` strategy is supported. - spot_recovery: none + # The recovery strategy for managed jobs (optional). + # + # In effect for managed jobs. Possible values are `FAILOVER` and `EAGER_NEXT_REGION`. + # + # If `FAILOVER` is specified, the job will be restarted in the same region + # if the node fails, and go to the next region if no available resources + # are found in the same region. + # + # If `EAGER_NEXT_REGION` is specified, the job will go to the next region + # directly if the node fails. This is useful for spot instances, as in + # practice, preemptions in a region usually indicate a shortage of resources + # in that region. + # + # default: EAGER_NEXT_REGION + job_recovery: none # Disk size in GB to allocate for OS (mounted at /). Increase this if you # have a large working directory or tasks that write out large outputs. diff --git a/examples/managed_job.yaml b/examples/managed_job.yaml new file mode 100644 index 00000000000..4bfcb63f40a --- /dev/null +++ b/examples/managed_job.yaml @@ -0,0 +1,16 @@ +name: minimal + +setup: | + echo "running setup" + pip install tqdm + +run: | + conda env list + python -u - << EOF + import time + import tqdm + + for i in tqdm.trange(240): + time.sleep(1) + + EOF diff --git a/examples/managed_spot_with_storage.yaml b/examples/managed_job_with_storage.yaml similarity index 83% rename from examples/managed_spot_with_storage.yaml rename to examples/managed_job_with_storage.yaml index 1b81e459bb4..ecefccd8b3d 100644 --- a/examples/managed_spot_with_storage.yaml +++ b/examples/managed_job_with_storage.yaml @@ -3,13 +3,13 @@ # Runs a task that uses cloud buckets for uploading and accessing files. # # Usage: -# sky spot launch -c spot-storage examples/managed_spot_with_storage.yaml +# sky spot launch -c spot-storage examples/managed_job_with_storage.yaml # sky down spot-storage resources: cloud: aws use_spot: true - spot_recovery: failover + job_recovery: failover workdir: ./examples @@ -41,8 +41,8 @@ file_mounts: run: | set -ex - ls ~/sky_workdir/managed_spot_with_storage.yaml - ls ~/bucket_workdir/managed_spot_with_storage.yaml + ls ~/sky_workdir/managed_job_with_storage.yaml + ls ~/bucket_workdir/managed_job_with_storage.yaml ls -l /imagenet-image/datasets diff --git a/examples/managed_spot.yaml b/examples/managed_spot.yaml index 4bfcb63f40a..712819eb9ca 100644 --- a/examples/managed_spot.yaml +++ b/examples/managed_spot.yaml @@ -1,5 +1,8 @@ name: minimal +resources: + use_spot: true + setup: | echo "running setup" pip install tqdm diff --git a/sky/__init__.py b/sky/__init__.py index d25c8297ea5..a077fb8966a 100644 --- a/sky/__init__.py +++ b/sky/__init__.py @@ -102,16 +102,16 @@ def set_proxy_env_var(proxy_var: str, urllib_var: Optional[str]): from sky.data import StoreType from sky.execution import exec # pylint: disable=redefined-builtin from sky.execution import launch +# TODO (zhwu): These imports are for backward compatibility, and spot APIs +# should be called with `sky.spot.xxx` instead. Remove in release 0.8.0 +from sky.jobs.core import spot_cancel +from sky.jobs.core import spot_launch +from sky.jobs.core import spot_queue +from sky.jobs.core import spot_tail_logs from sky.optimizer import Optimizer from sky.optimizer import OptimizeTarget from sky.resources import Resources from sky.skylet.job_lib import JobStatus -# TODO (zhwu): These imports are for backward compatibility, and spot APIs -# should be called with `sky.spot.xxx` instead. Remove in release 0.7.0 -from sky.spot.core import spot_cancel -from sky.spot.core import spot_launch -from sky.spot.core import spot_queue -from sky.spot.core import spot_tail_logs from sky.status_lib import ClusterStatus from sky.task import Task diff --git a/sky/authentication.py b/sky/authentication.py index 7b5699d3337..581fdc12c7f 100644 --- a/sky/authentication.py +++ b/sky/authentication.py @@ -15,7 +15,7 @@ The local machine's public key should not be uploaded to the `~/.ssh/sky-key.pub` on the remote VM, because it will cause private/public key pair mismatch when the user tries to launch new VM from that remote VM -using SkyPilot, e.g., the node is used as a spot controller. (Lambda cloud +using SkyPilot, e.g., the node is used as a jobs controller. (Lambda cloud is an exception, due to the limitation of the cloud provider. See the comments in setup_lambda_authentication) """ diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 5aed22b05ed..fecbcaad0b8 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -2239,18 +2239,18 @@ def check_cluster_available( # TODO(tian): Refactor to controller_utils. Current blocker: circular import. def is_controller_accessible( - controller_type: controller_utils.Controllers, + controller: controller_utils.Controllers, stopped_message: str, non_existent_message: Optional[str] = None, exit_if_not_accessible: bool = False, ) -> 'backends.CloudVmRayResourceHandle': - """Check if the spot/serve controller is up. + """Check if the jobs/serve controller is up. The controller is accessible when it is in UP or INIT state, and the ssh connection is successful. It can be used to check if the controller is accessible (since the autostop - is set for the controller) before the spot/serve commands interact with the + is set for the controller) before the jobs/serve commands interact with the controller. ClusterNotUpError will be raised whenever the controller cannot be accessed. @@ -2274,10 +2274,8 @@ def is_controller_accessible( failed to be connected. """ if non_existent_message is None: - non_existent_message = ( - controller_type.value.default_hint_if_non_existent) - cluster_name = controller_type.value.cluster_name - controller_name = controller_type.value.name.replace(' controller', '') + non_existent_message = controller.value.default_hint_if_non_existent + cluster_name = controller.value.cluster_name need_connection_check = False controller_status, handle = None, None try: @@ -2299,7 +2297,7 @@ def is_controller_accessible( # will not start the controller manually from the cloud console. # # The acquire_lock_timeout is set to 0 to avoid hanging the command when - # multiple spot.launch commands are running at the same time. Our later + # multiple jobs.launch commands are running at the same time. Our later # code will check if the controller is accessible by directly checking # the ssh connection to the controller, if it fails to get accurate # status of the controller. @@ -2311,6 +2309,7 @@ def is_controller_accessible( # We do not catch the exceptions related to the cluster owner identity # mismatch, please refer to the comment in # `backend_utils.check_cluster_available`. + controller_name = controller.value.name.replace(' controller', '') logger.warning( 'Failed to get the status of the controller. It is not ' f'fatal, but {controller_name} commands/calls may hang or return ' @@ -2336,7 +2335,7 @@ def is_controller_accessible( elif (controller_status == status_lib.ClusterStatus.INIT or need_connection_check): # Check ssh connection if (1) controller is in INIT state, or (2) we failed to fetch the - # status, both of which can happen when controller's status lock is held by another `sky spot launch` or + # status, both of which can happen when controller's status lock is held by another `sky jobs launch` or # `sky serve up`. If we have controller's head_ip available and it is ssh-reachable, # we can allow access to the controller. ssh_credentials = ssh_credential_from_yaml(handle.cluster_yaml, @@ -2347,7 +2346,7 @@ def is_controller_accessible( **ssh_credentials, port=handle.head_ssh_port) if not runner.check_connection(): - error_msg = controller_type.value.connection_error_hint + error_msg = controller.value.connection_error_hint else: assert controller_status == status_lib.ClusterStatus.UP, handle @@ -2386,7 +2385,7 @@ def get_clusters( of the clusters. Args: - include_controller: Whether to include controllers, e.g. spot controller + include_controller: Whether to include controllers, e.g. jobs controller or sky serve controller. refresh: Whether to refresh the status of the clusters. (Refreshing will set the status to STOPPED if the cluster cannot be pinged.) @@ -2546,8 +2545,8 @@ def get_task_demands_dict(task: 'task_lib.Task') -> Dict[str, float]: optionally accelerator demands. """ # TODO: Custom CPU and other memory resources are not supported yet. - # For sky spot/serve controller task, we set the CPU resource to a smaller - # value to support a larger number of spot jobs and services. + # For sky jobs/serve controller task, we set the CPU resource to a smaller + # value to support a larger number of managed jobs and services. resources_dict = { 'CPU': (constants.CONTROLLER_PROCESS_CPU_DEMAND if task.is_controller_task() else DEFAULT_TASK_CPU_DEMAND) @@ -2564,41 +2563,58 @@ def get_task_demands_dict(task: 'task_lib.Task') -> Dict[str, float]: return resources_dict -def get_task_resources_str(task: 'task_lib.Task') -> str: +def get_task_resources_str(task: 'task_lib.Task', + is_managed_job: bool = False) -> str: """Returns the resources string of the task. The resources string is only used as a display purpose, so we only show the accelerator demands (if any). Otherwise, the CPU demand is shown. """ - task_cpu_demand = (constants.CONTROLLER_PROCESS_CPU_DEMAND if - task.is_controller_task() else DEFAULT_TASK_CPU_DEMAND) + spot_str = '' + task_cpu_demand = (str(constants.CONTROLLER_PROCESS_CPU_DEMAND) + if task.is_controller_task() else + str(DEFAULT_TASK_CPU_DEMAND)) if task.best_resources is not None: accelerator_dict = task.best_resources.accelerators + if is_managed_job: + if task.best_resources.use_spot: + spot_str = '[Spot]' + task_cpu_demand = task.best_resources.cpus if accelerator_dict is None: resources_str = f'CPU:{task_cpu_demand}' else: resources_str = ', '.join( f'{k}:{v}' for k, v in accelerator_dict.items()) - elif len(task.resources) == 1: - resources_dict = list(task.resources)[0].accelerators - if resources_dict is None: - resources_str = f'CPU:{task_cpu_demand}' - else: - resources_str = ', '.join( - f'{k}:{v}' for k, v in resources_dict.items()) else: resource_accelerators = [] + min_cpus = float('inf') + spot_type: Set[str] = set() for resource in task.resources: + task_cpu_demand = '1+' + if resource.cpus is not None: + task_cpu_demand = resource.cpus + min_cpus = min(min_cpus, float(task_cpu_demand.strip('+ '))) + if resource.use_spot: + spot_type.add('Spot') + else: + spot_type.add('On-demand') + if resource.accelerators is None: continue for k, v in resource.accelerators.items(): resource_accelerators.append(f'{k}:{v}') + if is_managed_job: + if len(task.resources) > 1: + task_cpu_demand = f'{min_cpus}+' + if 'Spot' in spot_type: + spot_str = '|'.join(sorted(spot_type)) + spot_str = f'[{spot_str}]' if resource_accelerators: resources_str = ', '.join(set(resource_accelerators)) else: resources_str = f'CPU:{task_cpu_demand}' - resources_str = f'{task.num_nodes}x [{resources_str}]' + resources_str = f'{task.num_nodes}x[{resources_str}]{spot_str}' return resources_str diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 3196c45da55..f916d931b5f 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -28,12 +28,12 @@ from sky import clouds from sky import exceptions from sky import global_user_state +from sky import jobs as managed_jobs from sky import optimizer from sky import provision as provision_lib from sky import resources as resources_lib from sky import serve as serve_lib from sky import sky_logging -from sky import spot as spot_lib from sky import status_lib from sky import task as task_lib from sky.backends import backend_utils @@ -3115,7 +3115,7 @@ def _exec_code_on_head( codegen: str, job_id: int, detach_run: bool = False, - spot_dag: Optional['dag.Dag'] = None, + managed_job_dag: Optional['dag.Dag'] = None, ) -> None: """Executes generated code on the head node.""" style = colorama.Style @@ -3145,22 +3145,24 @@ def _exec_code_on_head( code = job_lib.JobLibCodeGen.queue_job(job_id, job_submit_cmd) job_submit_cmd = ' && '.join([mkdir_code, create_script_code, code]) - if spot_dag is not None: - # Add the spot job to spot queue table. - spot_codegen = spot_lib.SpotCodeGen() - spot_code = spot_codegen.set_pending(job_id, spot_dag) - # Set the spot job to PENDING state to make sure that this spot - # job appears in the `sky spot queue`, when there are already 16 - # controller process jobs running on the controller VM with 8 - # CPU cores. - # The spot job should be set to PENDING state *after* the + if managed_job_dag is not None: + # Add the managed job to job queue database. + managed_job_codegen = managed_jobs.ManagedJobCodeGen() + managed_job_code = managed_job_codegen.set_pending( + job_id, managed_job_dag) + # Set the managed job to PENDING state to make sure that this + # managed job appears in the `sky jobs queue`, when there are + # already 2x vCPU controller processes running on the controller VM, + # e.g., 16 controller processes running on a controller with 8 + # vCPUs. + # The managed job should be set to PENDING state *after* the # controller process job has been queued, as our skylet on spot - # controller will set the spot job in FAILED state if the + # controller will set the managed job in FAILED state if the # controller process job does not exist. - # We cannot set the spot job to PENDING state in the codegen for + # We cannot set the managed job to PENDING state in the codegen for # the controller process job, as it will stay in the job pending # table and not be executed until there is an empty slot. - job_submit_cmd = job_submit_cmd + ' && ' + spot_code + job_submit_cmd = job_submit_cmd + ' && ' + managed_job_code returncode, stdout, stderr = self.run_on_head(handle, job_submit_cmd, @@ -3181,8 +3183,9 @@ def _exec_code_on_head( try: if not detach_run: - if handle.cluster_name == spot_lib.SPOT_CONTROLLER_NAME: - self.tail_spot_logs(handle, job_id) + if (handle.cluster_name in controller_utils.Controllers. + JOBS_CONTROLLER.value.candidate_cluster_names): + self.tail_managed_job_logs(handle, job_id) else: # Sky logs. Not using subprocess.run since it will make the # ssh keep connected after ctrl-c. @@ -3190,24 +3193,24 @@ def _exec_code_on_head( finally: name = handle.cluster_name controller = controller_utils.Controllers.from_name(name) - if controller == controller_utils.Controllers.SPOT_CONTROLLER: + if controller == controller_utils.Controllers.JOBS_CONTROLLER: logger.info( - f'{fore.CYAN}Spot Job ID: ' + f'{fore.CYAN}Managed Job ID: ' f'{style.BRIGHT}{job_id}{style.RESET_ALL}' '\nTo cancel the job:\t\t' - f'{backend_utils.BOLD}sky spot cancel {job_id}' + f'{backend_utils.BOLD}sky jobs cancel {job_id}' f'{backend_utils.RESET_BOLD}' '\nTo stream job logs:\t\t' - f'{backend_utils.BOLD}sky spot logs {job_id}' + f'{backend_utils.BOLD}sky jobs logs {job_id}' f'{backend_utils.RESET_BOLD}' f'\nTo stream controller logs:\t' - f'{backend_utils.BOLD}sky spot logs --controller {job_id}' + f'{backend_utils.BOLD}sky jobs logs --controller {job_id}' f'{backend_utils.RESET_BOLD}' - '\nTo view all spot jobs:\t\t' - f'{backend_utils.BOLD}sky spot queue' + '\nTo view all managed jobs:\t' + f'{backend_utils.BOLD}sky jobs queue' f'{backend_utils.RESET_BOLD}' - '\nTo view the spot job dashboard:\t' - f'{backend_utils.BOLD}sky spot dashboard' + '\nTo view managed job dashboard:\t' + f'{backend_utils.BOLD}sky jobs dashboard' f'{backend_utils.RESET_BOLD}') elif controller is None: logger.info(f'{fore.CYAN}Job ID: ' @@ -3537,12 +3540,12 @@ def _rsync_down(args) -> None: def tail_logs(self, handle: CloudVmRayResourceHandle, job_id: Optional[int], - spot_job_id: Optional[int] = None, + managed_job_id: Optional[int] = None, follow: bool = True) -> int: code = job_lib.JobLibCodeGen.tail_logs(job_id, - spot_job_id=spot_job_id, + managed_job_id=managed_job_id, follow=follow) - if job_id is None and spot_job_id is None: + if job_id is None and managed_job_id is None: logger.info( 'Job ID not provided. Streaming the logs of the latest job.') @@ -3569,17 +3572,19 @@ def tail_logs(self, returncode = e.code return returncode - def tail_spot_logs(self, - handle: CloudVmRayResourceHandle, - job_id: Optional[int] = None, - job_name: Optional[str] = None, - follow: bool = True) -> None: + def tail_managed_job_logs(self, + handle: CloudVmRayResourceHandle, + job_id: Optional[int] = None, + job_name: Optional[str] = None, + follow: bool = True) -> None: # if job_name is not None, job_id should be None assert job_name is None or job_id is None, (job_name, job_id) if job_name is not None: - code = spot_lib.SpotCodeGen.stream_logs_by_name(job_name, follow) + code = managed_jobs.ManagedJobCodeGen.stream_logs_by_name( + job_name, follow) else: - code = spot_lib.SpotCodeGen.stream_logs_by_id(job_id, follow) + code = managed_jobs.ManagedJobCodeGen.stream_logs_by_id( + job_id, follow) # With the stdin=subprocess.DEVNULL, the ctrl-c will not directly # kill the process, so we need to handle it manually here. @@ -4565,8 +4570,8 @@ def _get_task_env_vars(self, task: task_lib.Task, job_id: int, handle: CloudVmRayResourceHandle) -> Dict[str, str]: """Returns the environment variables for the task.""" env_vars = task.envs.copy() - # If it is a managed spot job, the TASK_ID_ENV_VAR will have been - # already set by the controller. + # If it is a managed job, the TASK_ID_ENV_VAR will have been already set + # by the controller. if constants.TASK_ID_ENV_VAR not in env_vars: env_vars[ constants.TASK_ID_ENV_VAR] = common_utils.get_global_job_id( @@ -4618,7 +4623,7 @@ def _execute_task_one_node(self, handle: CloudVmRayResourceHandle, codegen.build(), job_id, detach_run=detach_run, - spot_dag=task.spot_dag) + managed_job_dag=task.managed_job_dag) def _execute_task_n_nodes(self, handle: CloudVmRayResourceHandle, task: task_lib.Task, job_id: int, @@ -4673,4 +4678,4 @@ def _execute_task_n_nodes(self, handle: CloudVmRayResourceHandle, codegen.build(), job_id, detach_run=detach_run, - spot_dag=task.spot_dag) + managed_job_dag=task.managed_job_dag) diff --git a/sky/cli.py b/sky/cli.py index 72667cffc97..485703e4caf 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -51,10 +51,10 @@ from sky import core from sky import exceptions from sky import global_user_state +from sky import jobs as managed_jobs from sky import provision as provision_lib from sky import serve as serve_lib from sky import sky_logging -from sky import spot as spot_lib from sky import status_lib from sky.adaptors import common as adaptors_common from sky.backends import backend_utils @@ -91,9 +91,9 @@ provision a new cluster with that name. Otherwise provision a new cluster with an autogenerated name.""" -# The maximum number of in-progress spot jobs to show in the status +# The maximum number of in-progress managed jobs to show in the status # command. -_NUM_SPOT_JOBS_TO_SHOW_IN_STATUS = 5 +_NUM_MANAGED_JOBS_TO_SHOW_IN_STATUS = 5 _STATUS_PROPERTY_CLUSTER_NUM_ERROR_MESSAGE = ( '{cluster_num} cluster{plural} {verb}. Please specify {cause} ' @@ -103,7 +103,7 @@ 'please retry after a while.') _DAG_NOT_SUPPORTED_MESSAGE = ('YAML specifies a DAG which is only supported by ' - '`sky spot launch`. `{command}` supports a ' + '`sky jobs launch`. `{command}` supports a ' 'single task only.') @@ -708,8 +708,8 @@ def _make_task_or_dag_from_entrypoint_with_overrides( ports: Optional[Tuple[str]] = None, env: Optional[List[Tuple[str, str]]] = None, field_to_ignore: Optional[List[str]] = None, - # spot launch specific - spot_recovery: Optional[str] = None, + # job launch specific + job_recovery: Optional[str] = None, ) -> Union[sky.Task, sky.Dag]: """Creates a task or a dag from an entrypoint with overrides. @@ -777,9 +777,9 @@ def _make_task_or_dag_from_entrypoint_with_overrides( if workdir is not None: task.workdir = workdir - # Spot launch specific. - if spot_recovery is not None: - override_params['spot_recovery'] = spot_recovery + # job launch specific. + if job_recovery is not None: + override_params['job_recovery'] = job_recovery task.set_resources_override(override_params) @@ -816,13 +816,30 @@ def get_help(self, ctx): return super().get_help(ctx) -def _with_deprecation_warning(f, original_name, alias_name): +def _with_deprecation_warning( + f, + original_name: str, + alias_name: str, + override_command_argument: Optional[Dict[str, Any]] = None): @functools.wraps(f) def wrapper(self, *args, **kwargs): + override_str = '' + if override_command_argument is not None: + overrides = [] + for k, v in override_command_argument.items(): + if isinstance(v, bool): + if v: + overrides.append(f'--{k}') + else: + overrides.append(f'--no-{k}') + else: + overrides.append(f'--{k.replace("_", "-")}={v}') + override_str = ' with additional arguments ' + ' '.join(overrides) click.secho( - f'WARNING: `{alias_name}` is deprecated and will be removed in a ' - f'future release. Please use `{original_name}` instead.\n', + f'WARNING: `{alias_name}` has been renamed to `{original_name}` ' + f'and will be removed in a future release. Please use the ' + f'latter{override_str} instead.\n', err=True, fg='yellow') return f(self, *args, **kwargs) @@ -830,17 +847,49 @@ def wrapper(self, *args, **kwargs): return wrapper -def _add_command_alias_to_group(group, command, name, hidden): +def _override_arguments(callback, override_command_argument: Dict[str, Any]): + + def wrapper(*args, **kwargs): + logger.info(f'Overriding arguments: {override_command_argument}') + kwargs.update(override_command_argument) + return callback(*args, **kwargs) + + return wrapper + + +def _add_command_alias( + group: click.Group, + command: click.Command, + hidden: bool = False, + new_group: Optional[click.Group] = None, + new_command_name: Optional[str] = None, + override_command_argument: Optional[Dict[str, Any]] = None, + with_warning: bool = True, +) -> None: """Add a alias of a command to a group.""" + if new_group is None: + new_group = group + if new_command_name is None: + new_command_name = command.name + if new_group == group and new_command_name == command.name: + raise ValueError('Cannot add an alias to the same command.') new_command = copy.deepcopy(command) new_command.hidden = hidden - new_command.name = name + new_command.name = new_command_name + + if override_command_argument: + new_command.callback = _override_arguments(new_command.callback, + override_command_argument) orig = f'sky {group.name} {command.name}' - alias = f'sky {group.name} {name}' - new_command.invoke = _with_deprecation_warning(new_command.invoke, orig, - alias) - group.add_command(new_command, name=name) + alias = f'sky {new_group.name} {new_command_name}' + if with_warning: + new_command.invoke = _with_deprecation_warning( + new_command.invoke, + orig, + alias, + override_command_argument=override_command_argument) + new_group.add_command(new_command, name=new_command_name) def _deprecate_and_hide_command(group, command_to_deprecate, @@ -1219,30 +1268,30 @@ def exec( sky.exec(task, backend=backend, cluster_name=cluster, detach_run=detach_run) -def _get_spot_jobs( +def _get_managed_jobs( refresh: bool, skip_finished: bool, show_all: bool, limit_num_jobs_to_show: bool = False, is_called_by_user: bool = False) -> Tuple[Optional[int], str]: - """Get the in-progress spot jobs. + """Get the in-progress managed jobs. Args: - refresh: Query the latest statuses, restarting the spot controller if + refresh: Query the latest statuses, restarting the jobs controller if stopped. skip_finished: Show only in-progress jobs. - show_all: Show all information of each spot job (e.g., region, price). + show_all: Show all information of each job (e.g., region, price). limit_num_jobs_to_show: If True, limit the number of jobs to show to - _NUM_SPOT_JOBS_TO_SHOW_IN_STATUS, which is mainly used by + _NUM_MANAGED_JOBS_TO_SHOW_IN_STATUS, which is mainly used by `sky status`. is_called_by_user: If this function is called by user directly, or an internal call. Returns: A tuple of (num_in_progress_jobs, msg). If num_in_progress_jobs is None, - it means there is an error when querying the spot jobs. In this case, + it means there is an error when querying the managed jobs. In this case, msg contains the error message. Otherwise, msg contains the formatted - spot job table. + managed job table. """ num_in_progress_jobs = None try: @@ -1250,32 +1299,51 @@ def _get_spot_jobs( usage_lib.messages.usage.set_internal() with sky_logging.silent(): # Make the call silent - spot_jobs = spot_lib.queue(refresh=refresh, - skip_finished=skip_finished) - num_in_progress_jobs = len(spot_jobs) + managed_jobs_ = managed_jobs.queue(refresh=refresh, + skip_finished=skip_finished) + num_in_progress_jobs = len(set(job['job_id'] for job in managed_jobs_)) except exceptions.ClusterNotUpError as e: controller_status = e.cluster_status msg = str(e) if controller_status is None: - msg += (f' (See: {colorama.Style.BRIGHT}sky spot -h' + msg += (f' (See: {colorama.Style.BRIGHT}sky jobs -h' f'{colorama.Style.RESET_ALL})') elif (controller_status == status_lib.ClusterStatus.STOPPED and is_called_by_user): - msg += (f' (See finished jobs: {colorama.Style.BRIGHT}' - f'sky spot queue --refresh{colorama.Style.RESET_ALL})') + msg += (f' (See finished managed jobs: {colorama.Style.BRIGHT}' + f'sky jobs queue --refresh{colorama.Style.RESET_ALL})') except RuntimeError as e: - msg = ('Failed to query spot jobs due to connection ' - 'issues. Try again later. ' - f'Details: {common_utils.format_exception(e, use_bracket=True)}') + msg = '' + try: + # Check the controller status again, as the RuntimeError is likely + # due to the controller being autostopped when querying the jobs. + controller_type = controller_utils.Controllers.JOBS_CONTROLLER + record = backend_utils.refresh_cluster_record( + controller_type.value.cluster_name, + cluster_status_lock_timeout=0) + if (record is None or + record['status'] == status_lib.ClusterStatus.STOPPED): + msg = controller_type.value.default_hint_if_non_existent + except Exception: # pylint: disable=broad-except + # This is to an best effort to find the latest controller status to + # print more helpful message, so we can ignore any exception to + # print the original error. + pass + if not msg: + msg = ( + 'Failed to query managed jobs due to connection ' + 'issues. Try again later. ' + f'Details: {common_utils.format_exception(e, use_bracket=True)}' + ) except Exception as e: # pylint: disable=broad-except - msg = ('Failed to query spot jobs: ' + msg = ('Failed to query managed jobs: ' f'{common_utils.format_exception(e, use_bracket=True)}') else: - max_jobs_to_show = (_NUM_SPOT_JOBS_TO_SHOW_IN_STATUS + max_jobs_to_show = (_NUM_MANAGED_JOBS_TO_SHOW_IN_STATUS if limit_num_jobs_to_show else None) - msg = spot_lib.format_job_table(spot_jobs, - show_all=show_all, - max_jobs=max_jobs_to_show) + msg = managed_jobs.format_job_table(managed_jobs_, + show_all=show_all, + max_jobs=max_jobs_to_show) return num_in_progress_jobs, msg @@ -1314,9 +1382,27 @@ def _get_services(service_names: Optional[List[str]], msg += (f' (See: {colorama.Style.BRIGHT}sky serve -h' f'{colorama.Style.RESET_ALL})') except RuntimeError as e: - msg = ('Failed to fetch service statuses due to connection issues. ' - 'Please try again later. Details: ' - f'{common_utils.format_exception(e, use_bracket=True)}') + msg = '' + try: + # Check the controller status again, as the RuntimeError is likely + # due to the controller being autostopped when querying the + # services. + controller_type = controller_utils.Controllers.SKY_SERVE_CONTROLLER + record = backend_utils.refresh_cluster_record( + controller_type.value.cluster_name, + cluster_status_lock_timeout=0) + if (record is None or + record['status'] == status_lib.ClusterStatus.STOPPED): + msg = controller_type.value.default_hint_if_non_existent + except Exception: # pylint: disable=broad-except + # This is to an best effort to find the latest controller status to + # print more helpful message, so we can ignore any exception to + # print the original error. + pass + if not msg: + msg = ('Failed to fetch service statuses due to connection issues. ' + 'Please try again later. Details: ' + f'{common_utils.format_exception(e, use_bracket=True)}') except Exception as e: # pylint: disable=broad-except msg = ('Failed to fetch service statuses: ' f'{common_utils.format_exception(e, use_bracket=True)}') @@ -1380,11 +1466,11 @@ def _get_services(service_names: Optional[List[str]], type=int, help=('Get the endpoint URL for the specified port number on the ' 'cluster. This option will override all other options.')) -@click.option('--show-spot-jobs/--no-show-spot-jobs', +@click.option('--show-managed-jobs/--no-show-managed-jobs', default=True, is_flag=True, required=False, - help='Also show recent in-progress spot jobs, if any.') + help='Also show recent in-progress managed jobs, if any.') @click.option('--show-services/--no-show-services', default=True, is_flag=True, @@ -1398,8 +1484,8 @@ def _get_services(service_names: Optional[List[str]], @usage_lib.entrypoint # pylint: disable=redefined-builtin def status(all: bool, refresh: bool, ip: bool, endpoints: bool, - endpoint: Optional[int], show_spot_jobs: bool, show_services: bool, - clusters: List[str]): + endpoint: Optional[int], show_managed_jobs: bool, + show_services: bool, clusters: List[str]): # NOTE(dev): Keep the docstring consistent between the Python API and CLI. """Show clusters. @@ -1458,19 +1544,20 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, or for autostop-enabled clusters, use ``--refresh`` to query the latest cluster statuses from the cloud providers. """ - # Using a pool with 2 worker to run the spot job query and sky serve service - # query in parallel to speed up. The pool provides a AsyncResult object that - # can be used as a future. + # Using a pool with 2 worker to run the managed job query and sky serve + # service query in parallel to speed up. The pool provides a AsyncResult + # object that can be used as a future. with multiprocessing.Pool(2) as pool: - # Do not show spot queue if user specifies clusters, and if user + # Do not show job queue if user specifies clusters, and if user # specifies --ip or --endpoint(s). - show_spot_jobs = show_spot_jobs and not any([clusters, ip, endpoints]) + show_managed_jobs = show_managed_jobs and not any( + [clusters, ip, endpoints]) show_endpoints = endpoints or endpoint is not None show_single_endpoint = endpoint is not None - if show_spot_jobs: - # Run the spot job query in parallel to speed up the status query. - spot_jobs_future = pool.apply_async( - _get_spot_jobs, + if show_managed_jobs: + # Run managed job query in parallel to speed up the status query. + managed_jobs_future = pool.apply_async( + _get_managed_jobs, kwds=dict(refresh=False, skip_finished=True, show_all=False, @@ -1655,16 +1742,16 @@ def _try_get_future_result(future) -> Tuple[bool, Any]: interrupted = True return interrupted, result - spot_jobs_query_interrupted = False - if show_spot_jobs: + managed_jobs_query_interrupted = False + if show_managed_jobs: click.echo(f'\n{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' - f'Managed spot jobs{colorama.Style.RESET_ALL}') - with rich_utils.safe_status('[cyan]Checking spot jobs[/]'): - spot_jobs_query_interrupted, result = _try_get_future_result( - spot_jobs_future) - if spot_jobs_query_interrupted: + f'Managed jobs{colorama.Style.RESET_ALL}') + with rich_utils.safe_status('[cyan]Checking managed jobs[/]'): + managed_jobs_query_interrupted, result = _try_get_future_result( + managed_jobs_future) + if managed_jobs_query_interrupted: # Set to -1, so that the controller is not considered - # down, and the hint for showing sky spot queue + # down, and the hint for showing sky jobs queue # will still be shown. num_in_progress_jobs = -1 msg = 'KeyboardInterrupt' @@ -1673,29 +1760,30 @@ def _try_get_future_result(future) -> Tuple[bool, Any]: click.echo(msg) if num_in_progress_jobs is not None: - # spot controller is UP. + # jobs controller is UP. job_info = '' if num_in_progress_jobs > 0: plural_and_verb = ' is' if num_in_progress_jobs > 1: plural_and_verb = 's are' job_info = ( - f'{num_in_progress_jobs} spot job{plural_and_verb} ' + f'{num_in_progress_jobs} managed job{plural_and_verb} ' 'in progress') - if num_in_progress_jobs > _NUM_SPOT_JOBS_TO_SHOW_IN_STATUS: + if (num_in_progress_jobs > + _NUM_MANAGED_JOBS_TO_SHOW_IN_STATUS): job_info += ( - f' ({_NUM_SPOT_JOBS_TO_SHOW_IN_STATUS} latest ones ' - 'shown)') + f' ({_NUM_MANAGED_JOBS_TO_SHOW_IN_STATUS} latest ' + 'ones shown)') job_info += '. ' hints.append( - controller_utils.Controllers.SPOT_CONTROLLER.value. + controller_utils.Controllers.JOBS_CONTROLLER.value. in_progress_hint.format(job_info=job_info)) if show_services: click.echo(f'\n{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' f'Services{colorama.Style.RESET_ALL}') num_services = None - if spot_jobs_query_interrupted: + if managed_jobs_query_interrupted: # The pool is terminated, so we cannot run the service query. msg = 'KeyboardInterrupt' else: @@ -1712,7 +1800,7 @@ def _try_get_future_result(future) -> Tuple[bool, Any]: hints.append(controller_utils.Controllers.SKY_SERVE_CONTROLLER. value.in_progress_hint) - if show_spot_jobs or show_services: + if show_managed_jobs or show_services: try: pool.close() pool.join() @@ -1983,7 +2071,7 @@ def logs( help='Skip confirmation prompt.') @click.argument('jobs', required=False, type=int, nargs=-1) @usage_lib.entrypoint -def cancel(cluster: str, all: bool, jobs: List[int], yes: bool): # pylint: disable=redefined-builtin +def cancel(cluster: str, all: bool, jobs: List[int], yes: bool): # pylint: disable=redefined-builtin, redefined-outer-name # NOTE(dev): Keep the docstring consistent between the Python API and CLI. """Cancel job(s). @@ -2382,7 +2470,7 @@ def start( if not to_start: return - # Checks for controller clusters (spot controller / sky serve controller). + # Checks for controller clusters (jobs controller / sky serve controller). controllers, normal_clusters = [], [] for name in to_start: if controller_utils.Controllers.from_name(name) is not None: @@ -2501,14 +2589,15 @@ def down( purge=purge) -def _hint_or_raise_for_down_spot_controller(controller_name: str): +def _hint_or_raise_for_down_jobs_controller(controller_name: str): controller = controller_utils.Controllers.from_name(controller_name) assert controller is not None, controller_name with rich_utils.safe_status( - '[bold cyan]Checking for in-progress spot jobs[/]'): + '[bold cyan]Checking for in-progress managed jobs[/]'): try: - spot_jobs = spot_lib.queue(refresh=False, skip_finished=True) + managed_jobs_ = managed_jobs.queue(refresh=False, + skip_finished=True) except exceptions.ClusterNotUpError as e: if controller.value.connection_error_hint in str(e): with ux_utils.print_exception_no_traceback(): @@ -2517,21 +2606,21 @@ def _hint_or_raise_for_down_spot_controller(controller_name: str): decline_down_when_failed_to_fetch_status_hint) if e.cluster_status is None: click.echo( - 'Managed spot controller has already been torn down.') + 'Managed jobs controller has already been torn down.') sys.exit(0) - # At this point, the spot jobs are failed to be fetched due to the - # controller being STOPPED or being firstly launched, i.e., there is - # no in-prgress spot jobs. - spot_jobs = [] + # At this point, the managed jobs are failed to be fetched due to + # the controller being STOPPED or being firstly launched, i.e., + # there is no in-prgress managed jobs. + managed_jobs_ = [] msg = (f'{colorama.Fore.YELLOW}WARNING: Tearing down the managed ' - 'spot controller. Please be aware of the following:' + 'jobs controller. Please be aware of the following:' f'{colorama.Style.RESET_ALL}' - '\n * All logs and status information of the spot ' - 'jobs (output of `sky spot queue`) will be lost.') + '\n * All logs and status information of the managed ' + 'jobs (output of `sky jobs queue`) will be lost.') click.echo(msg) - if spot_jobs: - job_table = spot_lib.format_job_table(spot_jobs, show_all=False) + if managed_jobs_: + job_table = managed_jobs.format_job_table(managed_jobs_, show_all=False) msg = controller.value.decline_down_for_dirty_controller_hint # Add prefix to each line to align with the bullet point. msg += '\n'.join( @@ -2539,7 +2628,7 @@ def _hint_or_raise_for_down_spot_controller(controller_name: str): with ux_utils.print_exception_no_traceback(): raise exceptions.NotSupportedError(msg) else: - click.echo(' * No in-progress spot jobs found. It should be safe to ' + click.echo(' * No in-progress managed jobs found. It should be safe to ' 'terminate (see caveats above).') @@ -2575,8 +2664,8 @@ def _hint_or_raise_for_down_sky_serve_controller(controller_name: str): _CONTROLLER_TO_HINT_OR_RAISE = { - controller_utils.Controllers.SPOT_CONTROLLER: - (_hint_or_raise_for_down_spot_controller), + controller_utils.Controllers.JOBS_CONTROLLER: + (_hint_or_raise_for_down_jobs_controller), controller_utils.Controllers.SKY_SERVE_CONTROLLER: (_hint_or_raise_for_down_sky_serve_controller), } @@ -2591,9 +2680,9 @@ def _down_or_stop_clusters( idle_minutes_to_autostop: Optional[int] = None) -> None: """Tears down or (auto-)stops a cluster (or all clusters). - Controllers (spot controller and sky serve controller) can only be + Controllers (jobs controller and sky serve controller) can only be terminated if the cluster name is explicitly and uniquely specified (not - via glob) and purge is set to True. + via glob). """ if down: command = 'down' @@ -2662,10 +2751,10 @@ def _down_or_stop_clusters( # TODO(zhwu): This hint or raise is not transactional, which # means even if it passed the check with no in-progress spot # or service and prompt the confirmation for termination, - # a user could still do a `sky spot launch` or a + # a user could still do a `sky jobs launch` or a # `sky serve up` before typing the delete, causing a leaked - # spot job or service. We should make this check atomic with - # the termination. + # managed job or service. We should make this check atomic + # with the termination. hint_or_raise(controller_name) except exceptions.ClusterOwnerIdentityMismatchError as e: if purge: @@ -3147,12 +3236,12 @@ def bench(): @cli.group(cls=_NaturalOrderGroup) -def spot(): - """Managed Spot CLI (spot instances with auto-recovery).""" +def jobs(): + """Managed Jobs CLI (jobs with auto-recovery).""" pass -@spot.command('launch', cls=_DocumentedCodeCommand) +@jobs.command('launch', cls=_DocumentedCodeCommand) @click.argument('entrypoint', required=True, type=str, @@ -3160,10 +3249,10 @@ def spot(): **_get_shell_complete_args(_complete_file_name)) # TODO(zhwu): Add --dryrun option to test the launch command. @_add_click_options(_TASK_OPTIONS_WITH_NAME + _EXTRA_RESOURCES_OPTIONS) -@click.option('--spot-recovery', +@click.option('--job-recovery', default=None, type=str, - help='Spot recovery strategy to use for the managed spot task.') + help='Recovery strategy to use for managed jobs.') @click.option( '--detach-run', '-d', @@ -3181,8 +3270,8 @@ def spot(): '(Default: True; this flag is deprecated and will be removed in a ' 'future release.) Whether to retry provisioning infinitely until the ' 'cluster is up, if unavailability errors are encountered. This ' # pylint: disable=bad-docstring-quotes - 'applies to launching the spot clusters (both the initial and any ' - 'recovery attempts), not the spot controller.')) + 'applies to launching all managed jobs (both the initial and ' + 'any recovery attempts), not the jobs controller.')) @click.option('--yes', '-y', is_flag=True, @@ -3191,7 +3280,7 @@ def spot(): help='Skip confirmation prompt.') @timeline.event @usage_lib.entrypoint -def spot_launch( +def jobs_launch( entrypoint: List[str], name: Optional[str], workdir: Optional[str], @@ -3205,7 +3294,7 @@ def spot_launch( num_nodes: Optional[int], use_spot: Optional[bool], image_id: Optional[str], - spot_recovery: Optional[str], + job_recovery: Optional[str], env_file: Optional[Dict[str, str]], env: List[Tuple[str, str]], disk_size: Optional[int], @@ -3215,7 +3304,7 @@ def spot_launch( retry_until_up: bool, yes: bool, ): - """Launch a managed spot job from a YAML or a command. + """Launch a managed job from a YAML or a command. If ENTRYPOINT points to a valid YAML file, it is read in as the task specification. Otherwise, it is interpreted as a bash command. @@ -3225,9 +3314,9 @@ def spot_launch( .. code-block:: bash # You can use normal task YAMLs. - sky spot launch task.yaml + sky jobs launch task.yaml - sky spot launch 'echo hello!' + sky jobs launch 'echo hello!' """ env = _merge_env_vars(env_file, env) task_or_dag = _make_task_or_dag_from_entrypoint_with_overrides( @@ -3248,16 +3337,17 @@ def spot_launch( disk_size=disk_size, disk_tier=disk_tier, ports=ports, - spot_recovery=spot_recovery, + job_recovery=job_recovery, ) - # Deprecation. + # Deprecation. We set the default behavior to be retry until up, and the + # flag `--retry-until-up` is deprecated. We can remove the flag in 0.8.0. if retry_until_up is not None: flag_str = '--retry-until-up' if not retry_until_up: flag_str = '--no-retry-until-up' click.secho( f'Flag {flag_str} is deprecated and will be removed in a ' - 'future release (managed spot jobs will always be retried). ' + 'future release (managed jobs will always be retried). ' 'Please file an issue if this does not work for you.', fg='yellow') else: @@ -3275,27 +3365,26 @@ def spot_launch( dag.name = name dag_utils.maybe_infer_and_fill_dag_and_task_names(dag) - dag_utils.fill_default_spot_config_in_dag_for_spot_launch(dag) + dag_utils.fill_default_config_in_dag_for_job_launch(dag) - click.secho( - f'Managed spot job {dag.name!r} will be launched on (estimated):', - fg='yellow') + click.secho(f'Managed job {dag.name!r} will be launched on (estimated):', + fg='yellow') dag = sky.optimize(dag) if not yes: - prompt = f'Launching the spot job {dag.name!r}. Proceed?' + prompt = f'Launching a managed job {dag.name!r}. Proceed?' if prompt is not None: click.confirm(prompt, default=True, abort=True, show_default=True) common_utils.check_cluster_name_is_valid(name) - spot_lib.launch(dag, - name, - detach_run=detach_run, - retry_until_up=retry_until_up) + managed_jobs.launch(dag, + name, + detach_run=detach_run, + retry_until_up=retry_until_up) -@spot.command('queue', cls=_DocumentedCodeCommand) +@jobs.command('queue', cls=_DocumentedCodeCommand) @click.option('--all', '-a', default=False, @@ -3308,7 +3397,7 @@ def spot_launch( default=False, is_flag=True, required=False, - help='Query the latest statuses, restarting the spot controller if stopped.' + help='Query the latest statuses, restarting the jobs controller if stopped.' ) @click.option('--skip-finished', '-s', @@ -3318,21 +3407,21 @@ def spot_launch( help='Show only pending/running jobs\' information.') @usage_lib.entrypoint # pylint: disable=redefined-builtin -def spot_queue(all: bool, refresh: bool, skip_finished: bool): - """Show statuses of managed spot jobs. +def jobs_queue(all: bool, refresh: bool, skip_finished: bool): + """Show statuses of managed jobs. - Each spot job can have one of the following statuses: + Each managed jobs can have one of the following statuses: - - ``PENDING``: Job is waiting for a free slot on the spot controller to be + - ``PENDING``: Job is waiting for a free slot on the jobs controller to be accepted. - - ``SUBMITTED``: Job is submitted to and accepted by the spot controller. + - ``SUBMITTED``: Job is submitted to and accepted by the jobs controller. - - ``STARTING``: Job is starting (provisioning a spot cluster). + - ``STARTING``: Job is starting (provisioning a cluster for the job). - ``RUNNING``: Job is running. - - ``RECOVERING``: The spot cluster is recovering from a preemption. + - ``RECOVERING``: The cluster of the job is recovering from a preemption. - ``SUCCEEDED``: Job succeeded. @@ -3355,12 +3444,12 @@ def spot_queue(all: bool, refresh: bool, skip_finished: bool): - ``FAILED_CONTROLLER``: Job failed due to an unexpected error in the spot controller. - If the job failed, either due to user code or spot unavailability, the - error log can be found with ``sky spot logs --controller``, e.g.: + If the job failed, either due to user code or resource unavailability, the + error log can be found with ``sky jobs logs --controller``, e.g.: .. code-block:: bash - sky spot logs --controller job_id + sky jobs logs --controller job_id This also shows the logs for provisioning and any preemption and recovery attempts. @@ -3369,37 +3458,37 @@ def spot_queue(all: bool, refresh: bool, skip_finished: bool): .. code-block:: bash - watch -n60 sky spot queue + watch -n60 sky jobs queue """ - click.secho('Fetching managed spot job statuses...', fg='yellow') - with rich_utils.safe_status('[cyan]Checking spot jobs[/]'): - _, msg = _get_spot_jobs(refresh=refresh, - skip_finished=skip_finished, - show_all=all, - is_called_by_user=True) + click.secho('Fetching managed job statuses...', fg='yellow') + with rich_utils.safe_status('[cyan]Checking managed jobs[/]'): + _, msg = _get_managed_jobs(refresh=refresh, + skip_finished=skip_finished, + show_all=all, + is_called_by_user=True) if not skip_finished: in_progress_only_hint = '' else: in_progress_only_hint = ' (showing in-progress jobs only)' click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' - f'Managed spot jobs{colorama.Style.RESET_ALL}' + f'Managed jobs{colorama.Style.RESET_ALL}' f'{in_progress_only_hint}\n{msg}') -@spot.command('cancel', cls=_DocumentedCodeCommand) +@jobs.command('cancel', cls=_DocumentedCodeCommand) @click.option('--name', '-n', required=False, type=str, - help='Managed spot job name to cancel.') + help='Managed job name to cancel.') @click.argument('job_ids', default=None, type=int, required=False, nargs=-1) @click.option('--all', '-a', is_flag=True, default=False, required=False, - help='Cancel all managed spot jobs.') + help='Cancel all managed jobs.') @click.option('--yes', '-y', is_flag=True, @@ -3408,8 +3497,8 @@ def spot_queue(all: bool, refresh: bool, skip_finished: bool): help='Skip confirmation prompt.') @usage_lib.entrypoint # pylint: disable=redefined-builtin -def spot_cancel(name: Optional[str], job_ids: Tuple[int], all: bool, yes: bool): - """Cancel managed spot jobs. +def jobs_cancel(name: Optional[str], job_ids: Tuple[int], all: bool, yes: bool): + """Cancel managed jobs. You can provide either a job name or a list of job IDs to be cancelled. They are exclusive options. @@ -3418,15 +3507,15 @@ def spot_cancel(name: Optional[str], job_ids: Tuple[int], all: bool, yes: bool): .. code-block:: bash - # Cancel managed spot job with name 'my-job' - $ sky spot cancel -n my-job + # Cancel managed job with name 'my-job' + $ sky jobs cancel -n my-job \b - # Cancel managed spot jobs with IDs 1, 2, 3 - $ sky spot cancel 1 2 3 + # Cancel managed jobs with IDs 1, 2, 3 + $ sky jobs cancel 1 2 3 """ backend_utils.is_controller_accessible( - controller_type=controller_utils.Controllers.SPOT_CONTROLLER, - stopped_message='All managed spot jobs should have finished.', + controller=controller_utils.Controllers.JOBS_CONTROLLER, + stopped_message='All managed jobs should have finished.', exit_if_not_accessible=True) job_id_str = ','.join(map(str, job_ids)) @@ -3439,24 +3528,24 @@ def spot_cancel(name: Optional[str], job_ids: Tuple[int], all: bool, yes: bool): f'Provided {argument_str!r}.') if not yes: - job_identity_str = (f'managed spot jobs with IDs {job_id_str}' + job_identity_str = (f'managed jobs with IDs {job_id_str}' if job_ids else repr(name)) if all: - job_identity_str = 'all managed spot jobs' + job_identity_str = 'all managed jobs' click.confirm(f'Cancelling {job_identity_str}. Proceed?', default=True, abort=True, show_default=True) - spot_lib.cancel(job_ids=job_ids, name=name, all=all) + managed_jobs.cancel(job_ids=job_ids, name=name, all=all) -@spot.command('logs', cls=_DocumentedCodeCommand) +@jobs.command('logs', cls=_DocumentedCodeCommand) @click.option('--name', '-n', required=False, type=str, - help='Managed spot job name.') + help='Managed job name.') @click.option( '--follow/--no-follow', is_flag=True, @@ -3471,22 +3560,23 @@ def spot_cancel(name: Optional[str], job_ids: Tuple[int], all: bool, yes: bool): 'launching/recoveries, etc.')) @click.argument('job_id', required=False, type=int) @usage_lib.entrypoint -def spot_logs(name: Optional[str], job_id: Optional[int], follow: bool, +def jobs_logs(name: Optional[str], job_id: Optional[int], follow: bool, controller: bool): - """Tail the log of a managed spot job.""" + """Tail the log of a managed job.""" try: if controller: - core.tail_logs(spot_lib.SPOT_CONTROLLER_NAME, - job_id=job_id, - follow=follow) + core.tail_logs( + controller_utils.Controllers.JOBS_CONTROLLER.value.cluster_name, + job_id=job_id, + follow=follow) else: - spot_lib.tail_logs(name=name, job_id=job_id, follow=follow) + managed_jobs.tail_logs(name=name, job_id=job_id, follow=follow) except exceptions.ClusterNotUpError as e: click.echo(e) sys.exit(1) -@spot.command('dashboard', cls=_DocumentedCodeCommand) +@jobs.command('dashboard', cls=_DocumentedCodeCommand) @click.option( '--port', '-p', @@ -3496,19 +3586,18 @@ def spot_logs(name: Optional[str], job_id: Optional[int], follow: bool, help=('Local port to use for the dashboard. If None, a free port is ' 'automatically chosen.')) @usage_lib.entrypoint -def spot_dashboard(port: Optional[int]): - """Opens a dashboard for spot jobs (needs controller to be UP).""" +def jobs_dashboard(port: Optional[int]): + """Opens a dashboard for managed jobs (needs controller to be UP).""" # TODO(zongheng): ideally, the controller/dashboard server should expose the # API perhaps via REST. Then here we would (1) not have to use SSH to try to # see if the controller is UP first, which is slow; (2) not have to run SSH # port forwarding first (we'd just launch a local dashboard which would make # REST API calls to the controller dashboard server). - click.secho('Checking if spot controller is up...', fg='yellow') - hint = ( - 'Dashboard is not available if spot controller is not up. Run a spot ' - 'job first.') + click.secho('Checking if jobs controller is up...', fg='yellow') + hint = ('Dashboard is not available if jobs controller is not up. Run a ' + 'managed job first.') backend_utils.is_controller_accessible( - controller_type=controller_utils.Controllers.SPOT_CONTROLLER, + controller=controller_utils.Controllers.JOBS_CONTROLLER, stopped_message=hint, non_existent_message=hint, exit_if_not_accessible=True) @@ -3519,8 +3608,9 @@ def spot_dashboard(port: Optional[int]): free_port = common_utils.find_free_port(remote_port) else: free_port = port - ssh_command = (f'ssh -qNL {free_port}:localhost:{remote_port} ' - f'{spot_lib.SPOT_CONTROLLER_NAME}') + ssh_command = ( + f'ssh -qNL {free_port}:localhost:{remote_port} ' + f'{controller_utils.Controllers.JOBS_CONTROLLER.value.cluster_name}') click.echo('Forwarding port: ', nl=False) click.secho(f'{ssh_command}', dim=True) @@ -3539,12 +3629,31 @@ def spot_dashboard(port: Optional[int]): try: os.killpg(os.getpgid(ssh_process.pid), signal.SIGTERM) except ProcessLookupError: - # This happens if spot controller is auto-stopped. + # This happens if jobs controller is auto-stopped. pass finally: click.echo('Exiting.') +# TODO(zhwu): Backward compatibility for the old `sky spot launch` command. +# It is now renamed to `sky jobs launch` and the old command is deprecated. +# Remove in v0.8.0. +@cli.group(cls=_NaturalOrderGroup) +def spot(): + """Alias for Managed Jobs CLI (default to managed spot jobs).""" + pass + + +_add_command_alias(jobs, + jobs_launch, + new_group=spot, + override_command_argument={'use_spot': True}) +_add_command_alias(jobs, jobs_queue, new_group=spot) +_add_command_alias(jobs, jobs_logs, new_group=spot) +_add_command_alias(jobs, jobs_cancel, new_group=spot) +_add_command_alias(jobs, jobs_dashboard, new_group=spot) + + @cli.group(cls=_NaturalOrderGroup) def serve(): """SkyServe CLI (multi-region, multi-cloud serving).""" @@ -4040,7 +4149,7 @@ def serve_down(service_names: List[str], all: bool, purge: bool, yes: bool): f'Provided {argument_str!r}.') backend_utils.is_controller_accessible( - controller_type=controller_utils.Controllers.SKY_SERVE_CONTROLLER, + controller=controller_utils.Controllers.SKY_SERVE_CONTROLLER, stopped_message='All services should have been terminated.', exit_if_not_accessible=True) diff --git a/sky/clouds/aws.py b/sky/clouds/aws.py index 542d1595d86..1fef481d8d0 100644 --- a/sky/clouds/aws.py +++ b/sky/clouds/aws.py @@ -37,7 +37,7 @@ # It has the following purposes: # - make all nodes (any cloud) able to access private S3 buckets # - make some remote nodes able to launch new nodes on AWS (i.e., makes -# AWS head node able to launch AWS workers, or any-cloud spot controller +# AWS head node able to launch AWS workers, or any-cloud jobs controller # able to launch spot clusters on AWS). # # If we detect the current user identity is AWS SSO, we will not upload this @@ -541,9 +541,9 @@ def check_credentials(cls) -> Tuple[bool, Optional[str]]: elif identity_type == AWSIdentityType.IAM_ROLE: # When using an IAM role, the credentials may not exist in the # ~/.aws/credentials file. So we don't check for the existence of the - # file. This will happen when the user is on a VM (or spot-controller) - # created by an SSO account, i.e. the VM will be assigned the IAM - # role: skypilot-v1. + # file. This will happen when the user is on a VM (or + # jobs-controller) created by an SSO account, i.e. the VM will be + # assigned the IAM role: skypilot-v1. hints = f'AWS IAM role is set.{single_cloud_hint}' else: # This file is required because it is required by the VMs launched on @@ -745,7 +745,7 @@ def get_credential_file_mounts(self) -> Dict[str, str]: # credentials. We need to define a mechanism to find out the cloud # provider of the cluster to be launched in this function and make sure # the cluster will not be used for launching clusters in other clouds, - # e.g. spot controller. + # e.g. jobs controller. if self._current_identity_type( ) != AWSIdentityType.SHARED_CREDENTIALS_FILE: return {} diff --git a/sky/clouds/cloud.py b/sky/clouds/cloud.py index d8e77f6f194..889e6716074 100644 --- a/sky/clouds/cloud.py +++ b/sky/clouds/cloud.py @@ -42,7 +42,7 @@ class CloudImplementationFeatures(enum.Enum): CUSTOM_DISK_TIER = 'custom_disk_tier' OPEN_PORTS = 'open_ports' STORAGE_MOUNTING = 'storage_mounting' - HOST_CONTROLLERS = 'host_controllers' # Can run spot/serve controllers + HOST_CONTROLLERS = 'host_controllers' # Can run jobs/serve controllers class Region(collections.namedtuple('Region', ['name'])): @@ -496,15 +496,16 @@ def validate_region_zone( zone, clouds=self._REPR.lower()) - def need_cleanup_after_preemption( + def need_cleanup_after_preemption_or_failure( self, resources: 'resources_lib.Resources') -> bool: - """Returns whether a spot resource needs cleanup after preeemption. + """Whether a resource needs cleanup after preeemption or failure. In most cases, spot resources do not need cleanup after preemption, as long as the cluster can be relaunched with the same name and tag, no matter the preemption behavior is to terminate or stop the cluster. - The only exception by far is GCP's Spot TPU VM. We override this method - in gcp.py. + Similar for on-demand resources that go into maintenance mode. The + only exception by far is GCP's TPU VM. We override this method in + gcp.py. """ del resources return False diff --git a/sky/clouds/gcp.py b/sky/clouds/gcp.py index 4557fd18678..7babf34ac52 100644 --- a/sky/clouds/gcp.py +++ b/sky/clouds/gcp.py @@ -841,13 +841,14 @@ def get_current_user_identity_str(cls) -> Optional[str]: def instance_type_exists(self, instance_type): return service_catalog.instance_type_exists(instance_type, 'gcp') - def need_cleanup_after_preemption(self, - resources: 'resources.Resources') -> bool: - """Returns whether a spot resource needs cleanup after preeemption.""" + def need_cleanup_after_preemption_or_failure( + self, resources: 'resources.Resources') -> bool: + """Whether a resource needs cleanup after preeemption or failure.""" # Spot TPU VMs require manual cleanup after preemption. # "If your Cloud TPU is preempted, # you must delete it and create a new one ..." # See: https://cloud.google.com/tpu/docs/preemptible#tpu-vm + # On-demand TPU VMs are likely to require manual cleanup as well. return gcp_utils.is_tpu_vm(resources) diff --git a/sky/core.py b/sky/core.py index c93a50f0b7d..c71a3fa9734 100644 --- a/sky/core.py +++ b/sky/core.py @@ -196,8 +196,6 @@ def _start( idle_minutes_to_autostop = ( constants.CONTROLLER_IDLE_MINUTES_TO_AUTOSTOP) - # NOTE: if spot_queue() calls _start() and hits here, that entrypoint - # would have a cluster name (the controller) filled in. usage_lib.record_cluster_name_for_current_operation(cluster_name) with dag.Dag(): @@ -264,7 +262,7 @@ def start( ValueError: argument values are invalid: (1) the specified cluster does not exist; (2) if ``down`` is set to True but ``idle_minutes_to_autostop`` is None; (3) if the specified cluster is - the managed spot controller, and either ``idle_minutes_to_autostop`` + the managed jobs controller, and either ``idle_minutes_to_autostop`` is not None or ``down`` is True (omit them to use the default autostop settings). sky.exceptions.NotSupportedError: if the cluster to restart was @@ -317,7 +315,7 @@ def stop(cluster_name: str, purge: bool = False) -> None: ValueError: the specified cluster does not exist. RuntimeError: failed to stop the cluster. sky.exceptions.NotSupportedError: if the specified cluster is a spot - cluster, or a TPU VM Pod cluster, or the managed spot controller. + cluster, or a TPU VM Pod cluster, or the managed jobs controller. """ if controller_utils.Controllers.from_name(cluster_name) is not None: raise exceptions.NotSupportedError( @@ -372,7 +370,7 @@ def down(cluster_name: str, purge: bool = False) -> None: ValueError: the specified cluster does not exist. RuntimeError: failed to tear down the cluster. sky.exceptions.NotSupportedError: the specified cluster is the managed - spot controller. + jobs controller. """ handle = global_user_state.get_handle_from_cluster_name(cluster_name) if handle is None: @@ -559,7 +557,7 @@ def cancel( Additional arguments: _try_cancel_if_cluster_is_init: (bool) whether to try cancelling the job even if the cluster is not UP, but the head node is still alive. - This is used by the spot controller to cancel the job when the + This is used by the jobs controller to cancel the job when the worker node is preempted in the spot cluster. Raises: diff --git a/sky/exceptions.py b/sky/exceptions.py index 131b4675399..e3b33ea3e5e 100644 --- a/sky/exceptions.py +++ b/sky/exceptions.py @@ -52,11 +52,12 @@ class InvalidCloudConfigs(Exception): class ProvisionPrechecksError(Exception): - """Raised when a spot job fails prechecks before provision. + """Raised when a managed job fails prechecks before provision. + Developer note: For now this should only be used by managed - spot code path (technically, this can/should be raised by the + jobs code path (technically, this can/should be raised by the lower-level sky.launch()). Please refer to the docstring of - `spot.recovery_strategy._launch` for more details about when + `jobs.recovery_strategy._launch` for more details about when the error will be raised. Args: @@ -68,11 +69,11 @@ def __init__(self, reasons: List[Exception]) -> None: self.reasons = list(reasons) -class SpotJobReachedMaxRetriesError(Exception): - """Raised when a spot job fails to be launched after maximum retries. +class ManagedJobReachedMaxRetriesError(Exception): + """Raised when a managed job fails to be launched after maximum retries. - Developer note: For now this should only be used by managed spot code - path. Please refer to the docstring of `spot.recovery_strategy._launch` + Developer note: For now this should only be used by managed jobs code + path. Please refer to the docstring of `jobs.recovery_strategy._launch` for more details about when the error will be raised. """ pass @@ -211,8 +212,8 @@ class ClusterStatusFetchingError(Exception): pass -class SpotUserCancelledError(Exception): - """Raised when a spot user cancels the job.""" +class ManagedJobUserCancelledError(Exception): + """Raised when a user cancels a managed job.""" pass diff --git a/sky/execution.py b/sky/execution.py index 25f0d8cc7a8..2cffc5a7d09 100644 --- a/sky/execution.py +++ b/sky/execution.py @@ -108,7 +108,7 @@ def _execute( clone_disk_from: Optional[str] = None, # Internal only: # pylint: disable=invalid-name - _is_launched_by_spot_controller: bool = False, + _is_launched_by_jobs_controller: bool = False, _is_launched_by_sky_serve_controller: bool = False, ) -> Tuple[Optional[int], Optional[backends.ResourceHandle]]: """Execute an entrypoint. @@ -160,11 +160,11 @@ def _execute( assert len(dag) == 1, f'We support 1 task for now. {dag}' task = dag.tasks[0] - if task.need_spot_recovery: + if any(r.job_recovery is not None for r in task.resources): with ux_utils.print_exception_no_traceback(): raise ValueError( - 'Spot recovery is specified in the task. To launch the ' - 'managed spot job, please use: sky spot launch') + 'Job recovery is specified in the task. To launch a ' + 'managed job, please use: sky jobs launch') cluster_exists = False if cluster_name is not None: @@ -225,10 +225,10 @@ def _execute( task) if not cluster_exists: - # If spot is launched by skyserve controller or managed spot controller, - # We don't need to print out the logger info. + # If spot is launched on serve or jobs controller, we don't need to + # print out the hint. if (Stage.PROVISION in stages and task.use_spot and - not _is_launched_by_spot_controller and + not _is_launched_by_jobs_controller and not _is_launched_by_sky_serve_controller): yellow = colorama.Fore.YELLOW bold = colorama.Style.BRIGHT @@ -236,9 +236,9 @@ def _execute( logger.info( f'{yellow}Launching an unmanaged spot task, which does not ' f'automatically recover from preemptions.{reset}\n{yellow}To ' - 'get automatic recovery, use managed spot instead: ' - f'{reset}{bold}sky spot launch{reset} {yellow}or{reset} ' - f'{bold}sky.spot.launch(){reset}.') + 'get automatic recovery, use managed job instead: ' + f'{reset}{bold}sky jobs launch{reset} {yellow}or{reset} ' + f'{bold}sky.jobs.launch(){reset}.') if Stage.OPTIMIZE in stages: if task.best_resources is None: @@ -318,10 +318,10 @@ def _execute( if controller is None and not _is_launched_by_sky_serve_controller: # UX: print live clusters to make users aware (to save costs). # - # Don't print if this job is launched by the spot controller, - # because spot jobs are serverless, there can be many of them, and - # users tend to continuously monitor spot jobs using `sky spot - # status`. Also don't print if this job is a skyserve controller + # Don't print if this job is launched by the jobs controller, + # because managed jobs are serverless, there can be many of them, + # and users tend to continuously monitor managed jobs using `sky + # job queue`. Also don't print if this job is a skyserve controller # job or launched by a skyserve controller job, because the # redirect for this subprocess.run won't success and it will # pollute the controller logs. @@ -330,7 +330,7 @@ def _execute( env = dict(os.environ, **{env_options.Options.DISABLE_LOGGING.value: '1'}) subprocess_utils.run( - 'sky status --no-show-spot-jobs --no-show-services', env=env) + 'sky status --no-show-managed-jobs --no-show-services', env=env) print() print('\x1b[?25h', end='') # Show cursor. return job_id, handle @@ -354,7 +354,7 @@ def launch( clone_disk_from: Optional[str] = None, # Internal only: # pylint: disable=invalid-name - _is_launched_by_spot_controller: bool = False, + _is_launched_by_jobs_controller: bool = False, _is_launched_by_sky_serve_controller: bool = False, _disable_controller_check: bool = False, ) -> Tuple[Optional[int], Optional[backends.ResourceHandle]]: @@ -464,7 +464,7 @@ def launch( idle_minutes_to_autostop=idle_minutes_to_autostop, no_setup=no_setup, clone_disk_from=clone_disk_from, - _is_launched_by_spot_controller=_is_launched_by_spot_controller, + _is_launched_by_jobs_controller=_is_launched_by_jobs_controller, _is_launched_by_sky_serve_controller= _is_launched_by_sky_serve_controller, ) diff --git a/sky/spot/README.md b/sky/jobs/README.md similarity index 52% rename from sky/spot/README.md rename to sky/jobs/README.md index ed20a77b46e..579f675a5f9 100644 --- a/sky/spot/README.md +++ b/sky/jobs/README.md @@ -1,11 +1,11 @@ -# SkyPilot Managed Spot +# SkyPilot Managed Jobs -This module is used for running user jobs on spot clusters, which automatically recovers the job from preemptions. +This module is used for running and managing user jobs, which automatically recovers failed jobs from spot preemptions and/or machine failures. ## Concepts -- Task: A task (sky.Task) is a unit of work. SkyPilot will launch a spot cluster to run the task, automatically recover the task from preemptions, and terminate the cluster when the task is done. -- Job: A job in the context of SkyPilot managed spot, is equivalent to a SkyPilot DAG (sky.Dag). A job is a collection of tasks that are executed in a specific order based on the dependencies between the tasks. Each controller process will be in charge of the whole lifecycle of a job. +- Task: A task (sky.Task) is a unit of work. SkyPilot will launch a cluster to run the task, automatically recover the task from preemptions, and terminate the cluster when the task is done. +- Job: A job in the context of SkyPilot managed jobs, is equivalent to a SkyPilot DAG (sky.Dag). A job is a collection of tasks that are executed in a specific order based on the dependencies between the tasks. Each controller process will be in charge of the whole lifecycle of a job. Note that for singleton (1-task) jobs, we will use the term "task" and "job" interchangeably. @@ -14,6 +14,6 @@ A job of n tasks (experimental; we support a pipeline of such tasks only): the j ## Architecture -![Architecture](../../docs/source/images/spot-controller.png) - +![Architecture](../../docs/source/images/managed-jobs-arch.png) + diff --git a/sky/jobs/__init__.py b/sky/jobs/__init__.py new file mode 100644 index 00000000000..922bb613ff7 --- /dev/null +++ b/sky/jobs/__init__.py @@ -0,0 +1,43 @@ +"""Managed jobs.""" +import pathlib + +from sky.jobs.constants import JOBS_CLUSTER_NAME_PREFIX_LENGTH +from sky.jobs.constants import JOBS_CONTROLLER_TEMPLATE +from sky.jobs.constants import JOBS_CONTROLLER_YAML_PREFIX +from sky.jobs.constants import JOBS_TASK_YAML_PREFIX +from sky.jobs.core import cancel +from sky.jobs.core import launch +from sky.jobs.core import queue +from sky.jobs.core import tail_logs +from sky.jobs.recovery_strategy import DEFAULT_RECOVERY_STRATEGY +from sky.jobs.recovery_strategy import RECOVERY_STRATEGIES +from sky.jobs.state import ManagedJobStatus +from sky.jobs.utils import dump_managed_job_queue +from sky.jobs.utils import format_job_table +from sky.jobs.utils import JOB_CONTROLLER_NAME +from sky.jobs.utils import load_managed_job_queue +from sky.jobs.utils import ManagedJobCodeGen + +pathlib.Path(JOBS_TASK_YAML_PREFIX).expanduser().parent.mkdir(parents=True, + exist_ok=True) +__all__ = [ + 'RECOVERY_STRATEGIES', + 'DEFAULT_RECOVERY_STRATEGY', + 'JOB_CONTROLLER_NAME', + # Constants + 'JOBS_CONTROLLER_TEMPLATE', + 'JOBS_CONTROLLER_YAML_PREFIX', + 'JOBS_TASK_YAML_PREFIX', + # Enums + 'ManagedJobStatus', + # Core + 'cancel', + 'launch', + 'queue', + 'tail_logs', + # utils + 'ManagedJobCodeGen', + 'format_job_table', + 'dump_managed_job_queue', + 'load_managed_job_queue', +] diff --git a/sky/jobs/constants.py b/sky/jobs/constants.py new file mode 100644 index 00000000000..d5f32908317 --- /dev/null +++ b/sky/jobs/constants.py @@ -0,0 +1,27 @@ +"""Constants used for Managed Jobs.""" + +JOBS_CONTROLLER_TEMPLATE = 'jobs-controller.yaml.j2' +JOBS_CONTROLLER_YAML_PREFIX = '~/.sky/jobs_controller' + +JOBS_TASK_YAML_PREFIX = '~/.sky/managed_jobs' + +# Resources as a dict for the jobs controller. +# Use default CPU instance type for jobs controller with >= 24GB, i.e. +# m6i.2xlarge (8vCPUs, 32 GB) for AWS, Standard_D8s_v4 (8vCPUs, 32 GB) +# for Azure, and n1-standard-8 (8 vCPUs, 32 GB) for GCP, etc. +# Based on profiling, memory should be at least 3x (in GB) as num vCPUs to avoid +# OOM (each vCPU can have 4 jobs controller processes as we set the CPU +# requirement to 0.25, and 3 GB is barely enough for 4 job processes). +# We use 50 GB disk size to reduce the cost. +CONTROLLER_RESOURCES = {'cpus': '8+', 'memory': '3x', 'disk_size': 50} + +# Max length of the cluster name for GCP is 35, the user hash to be attached is +# 4+1 chars, and we assume the maximum length of the job id is 4+1, so the max +# length of the cluster name prefix is 25 to avoid the cluster name being too +# long and truncated twice during the cluster creation. +JOBS_CLUSTER_NAME_PREFIX_LENGTH = 25 + +# The version of the lib files that jobs/utils use. Whenever there is an API +# change for the jobs/utils, we need to bump this version and update +# job.utils.ManagedJobCodeGen to handle the version update. +MANAGED_JOBS_VERSION = 1 diff --git a/sky/spot/controller.py b/sky/jobs/controller.py similarity index 66% rename from sky/spot/controller.py rename to sky/jobs/controller.py index 9308d1dd532..39c89d2784b 100644 --- a/sky/spot/controller.py +++ b/sky/jobs/controller.py @@ -1,4 +1,4 @@ -"""Controller: handles the life cycle of a managed spot cluster (job).""" +"""Controller: handles the life cycle of a managed job.""" import argparse import multiprocessing import os @@ -15,11 +15,11 @@ from sky import status_lib from sky.backends import backend_utils from sky.backends import cloud_vm_ray_backend +from sky.jobs import recovery_strategy +from sky.jobs import state as managed_job_state +from sky.jobs import utils as managed_job_utils from sky.skylet import constants from sky.skylet import job_lib -from sky.spot import recovery_strategy -from sky.spot import spot_state -from sky.spot import spot_utils from sky.usage import usage_lib from sky.utils import common_utils from sky.utils import controller_utils @@ -31,9 +31,9 @@ import sky # Use the explicit logger name so that the logger is under the -# `sky.spot.controller` namespace when executed directly, so as +# `sky.jobs.controller` namespace when executed directly, so as # to inherit the setup from the `sky` logger. -logger = sky_logging.init_logger('sky.spot.controller') +logger = sky_logging.init_logger('sky.jobs.controller') def _get_dag_and_name(dag_yaml: str) -> Tuple['sky.Dag', str]: @@ -43,8 +43,8 @@ def _get_dag_and_name(dag_yaml: str) -> Tuple['sky.Dag', str]: return dag, dag_name -class SpotController: - """Each spot controller manages the life cycle of one spot job.""" +class JobsController: + """Each jobs controller manages the life cycle of one managed job.""" def __init__(self, job_id: int, dag_yaml: str, retry_until_up: bool) -> None: @@ -88,23 +88,23 @@ def __init__(self, job_id: int, dag_yaml: str, def _download_log_and_stream( self, handle: cloud_vm_ray_backend.CloudVmRayResourceHandle) -> None: - """Downloads and streams the logs of the latest job of a spot cluster. + """Downloads and streams the logs of the latest job. - We do not stream the logs from the spot cluster directly, as the + We do not stream the logs from the cluster directly, as the donwload and stream should be faster, and more robust against preemptions or ssh disconnection during the streaming. """ - spot_job_logs_dir = os.path.join(constants.SKY_LOGS_DIRECTORY, - 'spot_jobs') + managed_job_logs_dir = os.path.join(constants.SKY_LOGS_DIRECTORY, + 'managed_jobs') controller_utils.download_and_stream_latest_job_log( - self._backend, handle, spot_job_logs_dir) + self._backend, handle, managed_job_logs_dir) logger.info(f'\n== End of logs (ID: {self._job_id}) ==') def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: - """Busy loop monitoring spot cluster status and handling recovery. + """Busy loop monitoring cluster status and handling recovery. When the task is successfully completed, this function returns True, - and will terminate the spot cluster before returning. + and will terminate the cluster before returning. If the user program fails, i.e. the task is set to FAILED or FAILED_SETUP, this function will return False. @@ -130,28 +130,28 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: due to: 1. Any of the underlying failover exceptions is due to resources unavailability. - 2. The cluster is preempted before the job is submitted. + 2. The cluster is preempted or failed before the job is + submitted. 3. Any unexpected error happens during the `sky.launch`. Other exceptions may be raised depending on the backend. """ - callback_func = spot_utils.event_callback_func(job_id=self._job_id, - task_id=task_id, - task=task) + callback_func = managed_job_utils.event_callback_func( + job_id=self._job_id, task_id=task_id, task=task) if task.run is None: logger.info(f'Skip running task {task_id} ({task.name}) due to its ' 'run commands being empty.') # Call set_started first to initialize columns in the state table, # including start_at and last_recovery_at to avoid issues for # uninitialized columns. - spot_state.set_started(job_id=self._job_id, - task_id=task_id, - start_time=time.time(), - callback_func=callback_func) - spot_state.set_succeeded(job_id=self._job_id, - task_id=task_id, - end_time=time.time(), - callback_func=callback_func) + managed_job_state.set_started(job_id=self._job_id, + task_id=task_id, + start_time=time.time(), + callback_func=callback_func) + managed_job_state.set_succeeded(job_id=self._job_id, + task_id=task_id, + end_time=time.time(), + callback_func=callback_func) return True usage_lib.messages.usage.update_task_id(task_id) task_id_env_var = task.envs[constants.TASK_ID_ENV_VAR] @@ -159,64 +159,65 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: if task_id == 0: submitted_at = backend_utils.get_timestamp_from_run_timestamp( self._backend.run_timestamp) - spot_state.set_submitted( + managed_job_state.set_submitted( self._job_id, task_id, self._backend.run_timestamp, submitted_at, - resources_str=backend_utils.get_task_resources_str(task), + resources_str=backend_utils.get_task_resources_str( + task, is_managed_job=True), callback_func=callback_func) logger.info( - f'Submitted spot job {self._job_id} (task: {task_id}, name: ' + f'Submitted managed job {self._job_id} (task: {task_id}, name: ' f'{task.name!r}); {constants.TASK_ID_ENV_VAR}: {task_id_env_var}') assert task.name is not None, task - cluster_name = spot_utils.generate_spot_cluster_name( + cluster_name = managed_job_utils.generate_managed_job_cluster_name( task.name, self._job_id) self._strategy_executor = recovery_strategy.StrategyExecutor.make( cluster_name, self._backend, task, self._retry_until_up) logger.info('Started monitoring.') - spot_state.set_starting(job_id=self._job_id, - task_id=task_id, - callback_func=callback_func) + managed_job_state.set_starting(job_id=self._job_id, + task_id=task_id, + callback_func=callback_func) remote_job_submitted_at = self._strategy_executor.launch() assert remote_job_submitted_at is not None, remote_job_submitted_at - spot_state.set_started(job_id=self._job_id, - task_id=task_id, - start_time=remote_job_submitted_at, - callback_func=callback_func) + managed_job_state.set_started(job_id=self._job_id, + task_id=task_id, + start_time=remote_job_submitted_at, + callback_func=callback_func) while True: - time.sleep(spot_utils.JOB_STATUS_CHECK_GAP_SECONDS) + time.sleep(managed_job_utils.JOB_STATUS_CHECK_GAP_SECONDS) # Check the network connection to avoid false alarm for job failure. # Network glitch was observed even in the VM. try: backend_utils.check_network_connection() except exceptions.NetworkError: - logger.info( - 'Network is not available. Retrying again in ' - f'{spot_utils.JOB_STATUS_CHECK_GAP_SECONDS} seconds.') + logger.info('Network is not available. Retrying again in ' + f'{managed_job_utils.JOB_STATUS_CHECK_GAP_SECONDS} ' + 'seconds.') continue # NOTE: we do not check cluster status first because race condition # can occur, i.e. cluster can be down during the job status check. - job_status = spot_utils.get_job_status(self._backend, cluster_name) + job_status = managed_job_utils.get_job_status( + self._backend, cluster_name) if job_status == job_lib.JobStatus.SUCCEEDED: - end_time = spot_utils.get_job_timestamp(self._backend, - cluster_name, - get_end_time=True) + end_time = managed_job_utils.get_job_timestamp( + self._backend, cluster_name, get_end_time=True) # The job is done. - spot_state.set_succeeded(self._job_id, - task_id, - end_time=end_time, - callback_func=callback_func) + managed_job_state.set_succeeded(self._job_id, + task_id, + end_time=end_time, + callback_func=callback_func) logger.info( f'Spot job {self._job_id} (task: {task_id}) SUCCEEDED. ' - f'Cleaning up the spot cluster {cluster_name}.') - # Only clean up the spot cluster, not the storages, because - # tasks may share storages. + f'Cleaning up the cluster {cluster_name}.') + # Only clean up the cluster, not the storages, because tasks may + # share storages. recovery_strategy.terminate_cluster(cluster_name=cluster_name) return True @@ -224,7 +225,8 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: # healthy cluster. We can safely continue monitoring. # For multi-node jobs, since the job may not be set to FAILED # immediately (depending on user program) when only some of the - # nodes are preempted, need to check the actual cluster status. + # nodes are preempted or failed, need to check the actual cluster + # status. if (job_status is not None and not job_status.is_terminal() and task.num_nodes == 1): continue @@ -235,21 +237,28 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: # Add a grace period before the check of preemption to avoid # false alarm for job failure. time.sleep(5) + # Pull the actual cluster status from the cloud provider to - # determine whether the cluster is preempted. + # determine whether the cluster is preempted or failed. + # TODO(zhwu): For hardware failure, such as GPU failure, it may not + # be reflected in the cluster status, depending on the cloud, which + # can also cause failure of the job, and we need to recover it + # rather than fail immediately. (cluster_status, handle) = backend_utils.refresh_cluster_status_handle( cluster_name, force_refresh_statuses=set(status_lib.ClusterStatus)) if cluster_status != status_lib.ClusterStatus.UP: - # The cluster is (partially) preempted. It can be down, INIT - # or STOPPED, based on the interruption behavior of the cloud. - # Spot recovery is needed (will be done later in the code). + # The cluster is (partially) preempted or failed. It can be + # down, INIT or STOPPED, based on the interruption behavior of + # the cloud. Spot recovery is needed (will be done later in the + # code). cluster_status_str = ('' if cluster_status is None else f' (status: {cluster_status.value})') logger.info( - f'Cluster is preempted{cluster_status_str}. Recovering...') + f'Cluster is preempted or failed{cluster_status_str}. ' + 'Recovering...') else: if job_status is not None and not job_status.is_terminal(): # The multi-node job is still running, continue monitoring. @@ -258,27 +267,29 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: job_lib.JobStatus.FAILED, job_lib.JobStatus.FAILED_SETUP ]: # The user code has probably crashed, fail immediately. - end_time = spot_utils.get_job_timestamp(self._backend, - cluster_name, - get_end_time=True) + end_time = managed_job_utils.get_job_timestamp( + self._backend, cluster_name, get_end_time=True) logger.info( 'The user job failed. Please check the logs below.\n' f'== Logs of the user job (ID: {self._job_id}) ==\n') self._download_log_and_stream(handle) - spot_status_to_set = spot_state.SpotStatus.FAILED + managed_job_status = ( + managed_job_state.ManagedJobStatus.FAILED) if job_status == job_lib.JobStatus.FAILED_SETUP: - spot_status_to_set = spot_state.SpotStatus.FAILED_SETUP + managed_job_status = ( + managed_job_state.ManagedJobStatus.FAILED_SETUP) failure_reason = ( 'To see the details, run: ' - f'sky spot logs --controller {self._job_id}') - - spot_state.set_failed(self._job_id, - task_id, - failure_type=spot_status_to_set, - failure_reason=failure_reason, - end_time=end_time, - callback_func=callback_func) + f'sky jobs logs --controller {self._job_id}') + + managed_job_state.set_failed( + self._job_id, + task_id, + failure_type=managed_job_status, + failure_reason=failure_reason, + end_time=end_time, + callback_func=callback_func) return False # Although the cluster is healthy, we fail to access the # job status. Try to recover the job (will not restart the @@ -292,22 +303,24 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: if handle is not None: resources = handle.launched_resources assert resources is not None, handle - if resources.need_cleanup_after_preemption(): + if resources.need_cleanup_after_preemption_or_failure(): # Some spot resource (e.g., Spot TPU VM) may need to be - # cleaned up after preemption. - logger.info('Cleaning up the preempted spot cluster...') + # cleaned up after preemption, as running launch again on + # those clusters again may fail. + logger.info('Cleaning up the preempted or failed cluster' + '...') recovery_strategy.terminate_cluster(cluster_name) - # Try to recover the spot jobs, when the cluster is preempted - # or the job status is failed to be fetched. - spot_state.set_recovering(job_id=self._job_id, - task_id=task_id, - callback_func=callback_func) + # Try to recover the managed jobs, when the cluster is preempted or + # failed or the job status is failed to be fetched. + managed_job_state.set_recovering(job_id=self._job_id, + task_id=task_id, + callback_func=callback_func) recovered_time = self._strategy_executor.recover() - spot_state.set_recovered(self._job_id, - task_id, - recovered_time=recovered_time, - callback_func=callback_func) + managed_job_state.set_recovered(self._job_id, + task_id, + recovered_time=recovered_time, + callback_func=callback_func) def run(self): """Run controller logic and handle exceptions.""" @@ -326,27 +339,29 @@ def run(self): common_utils.format_exception(reason, use_bracket=True) for reason in e.reasons)) logger.error(failure_reason) - spot_state.set_failed( + managed_job_state.set_failed( self._job_id, task_id=task_id, - failure_type=spot_state.SpotStatus.FAILED_PRECHECKS, + failure_type=managed_job_state.ManagedJobStatus. + FAILED_PRECHECKS, failure_reason=failure_reason, - callback_func=spot_utils.event_callback_func( + callback_func=managed_job_utils.event_callback_func( job_id=self._job_id, task_id=task_id, task=self._dag.tasks[task_id])) - except exceptions.SpotJobReachedMaxRetriesError as e: + except exceptions.ManagedJobReachedMaxRetriesError as e: # Please refer to the docstring of self._run for the cases when # this exception can occur. logger.error(common_utils.format_exception(e)) - # The spot job should be marked as FAILED_NO_RESOURCE, as the - # spot job may be able to launch next time. - spot_state.set_failed( + # The managed job should be marked as FAILED_NO_RESOURCE, as the + # managed job may be able to launch next time. + managed_job_state.set_failed( self._job_id, task_id=task_id, - failure_type=spot_state.SpotStatus.FAILED_NO_RESOURCE, + failure_type=managed_job_state.ManagedJobStatus. + FAILED_NO_RESOURCE, failure_reason=common_utils.format_exception(e), - callback_func=spot_utils.event_callback_func( + callback_func=managed_job_utils.event_callback_func( job_id=self._job_id, task_id=task_id, task=self._dag.tasks[task_id])) @@ -356,12 +371,13 @@ def run(self): msg = ('Unexpected error occurred: ' f'{common_utils.format_exception(e, use_bracket=True)}') logger.error(msg) - spot_state.set_failed( + managed_job_state.set_failed( self._job_id, task_id=task_id, - failure_type=spot_state.SpotStatus.FAILED_CONTROLLER, + failure_type=managed_job_state.ManagedJobStatus. + FAILED_CONTROLLER, failure_reason=msg, - callback_func=spot_utils.event_callback_func( + callback_func=managed_job_utils.event_callback_func( job_id=self._job_id, task_id=task_id, task=self._dag.tasks[task_id])) @@ -370,27 +386,28 @@ def run(self): # affect the jobs in terminal states. # We need to call set_cancelling before set_cancelled to make sure # the table entries are correctly set. - callback_func = spot_utils.event_callback_func( + callback_func = managed_job_utils.event_callback_func( job_id=self._job_id, task_id=task_id, task=self._dag.tasks[task_id]) - spot_state.set_cancelling(job_id=self._job_id, - callback_func=callback_func) - spot_state.set_cancelled(job_id=self._job_id, - callback_func=callback_func) + managed_job_state.set_cancelling(job_id=self._job_id, + callback_func=callback_func) + managed_job_state.set_cancelled(job_id=self._job_id, + callback_func=callback_func) def _run_controller(job_id: int, dag_yaml: str, retry_until_up: bool): """Runs the controller in a remote process for interruption.""" # The controller needs to be instantiated in the remote process, since # the controller is not serializable. - spot_controller = SpotController(job_id, dag_yaml, retry_until_up) - spot_controller.run() + jobs_controller = JobsController(job_id, dag_yaml, retry_until_up) + jobs_controller.run() def _handle_signal(job_id): """Handle the signal if the user sent it.""" - signal_file = pathlib.Path(spot_utils.SIGNAL_FILE_PREFIX.format(job_id)) + signal_file = pathlib.Path( + managed_job_utils.SIGNAL_FILE_PREFIX.format(job_id)) user_signal = None if signal_file.exists(): # Filelock is needed to prevent race condition with concurrent @@ -399,7 +416,7 @@ def _handle_signal(job_id): with signal_file.open(mode='r', encoding='utf-8') as f: user_signal = f.read().strip() try: - user_signal = spot_utils.UserSignal(user_signal) + user_signal = managed_job_utils.UserSignal(user_signal) except ValueError: logger.warning( f'Unknown signal received: {user_signal}. Ignoring.') @@ -409,28 +426,29 @@ def _handle_signal(job_id): if user_signal is None: # None or empty string. return - assert user_signal == spot_utils.UserSignal.CANCEL, ( + assert user_signal == managed_job_utils.UserSignal.CANCEL, ( f'Only cancel signal is supported, but {user_signal} got.') - raise exceptions.SpotUserCancelledError( + raise exceptions.ManagedJobUserCancelledError( f'User sent {user_signal.value} signal.') def _cleanup(job_id: int, dag_yaml: str): - """Clean up the spot cluster(s) and storages. + """Clean up the cluster(s) and storages. (1) Clean up the succeeded task(s)' ephemeral storage. The storage has to be cleaned up after the whole job is finished, as the tasks may share the same storage. - (2) Clean up the spot cluster(s) that are not cleaned up yet, which - can happen when the spot task failed or cancelled. At most one - spot cluster should be left when reaching here, as we currently - only support chain DAGs, and only spot task is executed at a time. + (2) Clean up the cluster(s) that are not cleaned up yet, which can happen + when the task failed or cancelled. At most one cluster should be left + when reaching here, as we currently only support chain DAGs, and only + task is executed at a time. """ # NOTE: The code to get cluster name is same as what we did in the spot - # controller, we should keep it in sync with SpotController.__init__() + # controller, we should keep it in sync with JobsController.__init__() dag, _ = _get_dag_and_name(dag_yaml) for task in dag.tasks: - cluster_name = spot_utils.generate_spot_cluster_name(task.name, job_id) + cluster_name = managed_job_utils.generate_managed_job_cluster_name( + task.name, job_id) recovery_strategy.terminate_cluster(cluster_name) # Clean up Storages with persistent=False. # TODO(zhwu): this assumes the specific backend. @@ -457,16 +475,15 @@ def start(job_id, dag_yaml, retry_until_up): while controller_process.is_alive(): _handle_signal(job_id) time.sleep(1) - except exceptions.SpotUserCancelledError: + except exceptions.ManagedJobUserCancelledError: dag, _ = _get_dag_and_name(dag_yaml) - task_id, _ = (spot_state.get_latest_task_id_status(job_id)) + task_id, _ = managed_job_state.get_latest_task_id_status(job_id) logger.info( - f'Cancelling spot job, job_id: {job_id}, task_id: {task_id}') - spot_state.set_cancelling(job_id=job_id, - callback_func=spot_utils.event_callback_func( - job_id=job_id, - task_id=task_id, - task=dag.tasks[task_id])) + f'Cancelling managed job, job_id: {job_id}, task_id: {task_id}') + managed_job_state.set_cancelling( + job_id=job_id, + callback_func=managed_job_utils.event_callback_func( + job_id=job_id, task_id=task_id, task=dag.tasks[task_id])) cancelling = True finally: if controller_process is not None: @@ -480,37 +497,38 @@ def start(job_id, dag_yaml, retry_until_up): controller_process.join() logger.info(f'Controller process {controller_process.pid} killed.') - logger.info(f'Cleaning up any spot cluster for job {job_id}.') + logger.info(f'Cleaning up any cluster for job {job_id}.') # NOTE: Originally, we send an interruption signal to the controller # process and the controller process handles cleanup. However, we # figure out the behavior differs from cloud to cloud # (e.g., GCP ignores 'SIGINT'). A possible explanation is # https://unix.stackexchange.com/questions/356408/strange-problem-with-trap-and-sigint # But anyway, a clean solution is killing the controller process - # directly, and then cleanup the cluster state. + # directly, and then cleanup the cluster job_state. _cleanup(job_id, dag_yaml=dag_yaml) - logger.info(f'Spot cluster of job {job_id} has been cleaned up.') + logger.info(f'Cluster of managed job {job_id} has been cleaned up.') if cancelling: - spot_state.set_cancelled( + managed_job_state.set_cancelled( job_id=job_id, - callback_func=spot_utils.event_callback_func( + callback_func=managed_job_utils.event_callback_func( job_id=job_id, task_id=task_id, task=dag.tasks[task_id])) # We should check job status after 'set_cancelled', otherwise # the job status is not terminal. - job_status = spot_state.get_status(job_id) + job_status = managed_job_state.get_status(job_id) assert job_status is not None # The job can be non-terminal if the controller exited abnormally, # e.g. failed to launch cluster after reaching the MAX_RETRY. if not job_status.is_terminal(): - logger.info(f'Previous spot job status: {job_status.value}') - spot_state.set_failed( + logger.info(f'Previous job status: {job_status.value}') + managed_job_state.set_failed( job_id, task_id=None, - failure_type=spot_state.SpotStatus.FAILED_CONTROLLER, + failure_type=managed_job_state.ManagedJobStatus. + FAILED_CONTROLLER, failure_reason=('Unexpected error occurred. For details, ' - f'run: sky spot logs --controller {job_id}')) + f'run: sky jobs logs --controller {job_id}')) if __name__ == '__main__': @@ -521,10 +539,10 @@ def start(job_id, dag_yaml, retry_until_up): help='Job id for the controller job.') parser.add_argument('--retry-until-up', action='store_true', - help='Retry until the spot cluster is up.') + help='Retry until the cluster is up.') parser.add_argument('dag_yaml', type=str, - help='The path to the user spot task yaml file.') + help='The path to the user job yaml file.') args = parser.parse_args() # We start process with 'spawn', because 'fork' could result in weird # behaviors; 'spawn' is also cross-platform. diff --git a/sky/spot/core.py b/sky/jobs/core.py similarity index 68% rename from sky/spot/core.py rename to sky/jobs/core.py index 61673e4728f..ff9953489d5 100644 --- a/sky/spot/core.py +++ b/sky/jobs/core.py @@ -1,4 +1,4 @@ -"""SDK functions for managed spot job.""" +"""SDK functions for managed jobs.""" import os import tempfile from typing import Any, Dict, List, Optional, Union @@ -14,9 +14,9 @@ from sky import task as task_lib from sky.backends import backend_utils from sky.clouds.service_catalog import common as service_catalog_common +from sky.jobs import constants as managed_job_constants +from sky.jobs import utils as managed_job_utils from sky.skylet import constants as skylet_constants -from sky.spot import constants -from sky.spot import spot_utils from sky.usage import usage_lib from sky.utils import common_utils from sky.utils import controller_utils @@ -35,18 +35,19 @@ def launch( retry_until_up: bool = False, ) -> None: # NOTE(dev): Keep the docstring consistent between the Python API and CLI. - """Launch a managed spot job. + """Launch a managed job. - Please refer to the sky.cli.spot_launch for the document. + Please refer to sky.cli.job_launch for documentation. Args: task: sky.Task, or sky.Dag (experimental; 1-task only) to launch as a - managed spot job. - name: Name of the spot job. + managed job. + name: Name of the managed job. detach_run: Whether to detach the run. Raises: - ValueError: cluster does not exist. + ValueError: cluster does not exist. Or, the entrypoint is not a valid + chain dag. sky.exceptions.NotSupportedError: the feature is not supported. """ entrypoint = task @@ -55,8 +56,8 @@ def launch( dag = dag_utils.convert_entrypoint_to_dag(entrypoint) if not dag.is_chain(): with ux_utils.print_exception_no_traceback(): - raise ValueError('Only single-task or chain DAG is allowed for ' - f'sky.spot.launch. Dag:\n{dag}') + raise ValueError('Only single-task or chain DAG is ' + f'allowed for job_launch. Dag: {dag}') dag_utils.maybe_infer_and_fill_dag_and_task_names(dag) @@ -71,28 +72,29 @@ def launch( 'will be auto-generated) .') task_names.add(task_.name) - dag_utils.fill_default_spot_config_in_dag_for_spot_launch(dag) + dag_utils.fill_default_config_in_dag_for_job_launch(dag) for task_ in dag.tasks: controller_utils.maybe_translate_local_file_mounts_and_sync_up( - task_, path='spot') + task_, path='jobs') - with tempfile.NamedTemporaryFile(prefix=f'spot-dag-{dag.name}-', + with tempfile.NamedTemporaryFile(prefix=f'managed-dag-{dag.name}-', mode='w') as f: dag_utils.dump_chain_dag_to_yaml(dag, f.name) - controller_name = spot_utils.SPOT_CONTROLLER_NAME - prefix = constants.SPOT_TASK_YAML_PREFIX + controller = controller_utils.Controllers.JOBS_CONTROLLER + controller_name = controller.value.cluster_name + prefix = managed_job_constants.JOBS_TASK_YAML_PREFIX remote_user_yaml_path = f'{prefix}/{dag.name}-{dag_uuid}.yaml' remote_user_config_path = f'{prefix}/{dag.name}-{dag_uuid}.config_yaml' controller_resources = controller_utils.get_controller_resources( - controller_type='spot', + controller=controller_utils.Controllers.JOBS_CONTROLLER, task_resources=sum([list(t.resources) for t in dag.tasks], [])) vars_to_fill = { 'remote_user_yaml_path': remote_user_yaml_path, 'user_yaml_path': f.name, - 'spot_controller': controller_name, - # Note: actual spot cluster name will be - + 'jobs_controller': controller_name, + # Note: actual cluster name will be - 'dag_name': dag.name, 'retry_until_up': retry_until_up, 'remote_user_config_path': remote_user_config_path, @@ -100,27 +102,29 @@ def launch( 'modified_catalogs': service_catalog_common.get_modified_catalog_file_mounts(), **controller_utils.shared_controller_vars_to_fill( - 'spot', + controller_utils.Controllers.JOBS_CONTROLLER, remote_user_config_path=remote_user_config_path, ), } - yaml_path = os.path.join(constants.SPOT_CONTROLLER_YAML_PREFIX, - f'{name}-{dag_uuid}.yaml') - common_utils.fill_template(constants.SPOT_CONTROLLER_TEMPLATE, - vars_to_fill, - output_path=yaml_path) + yaml_path = os.path.join( + managed_job_constants.JOBS_CONTROLLER_YAML_PREFIX, + f'{name}-{dag_uuid}.yaml') + common_utils.fill_template( + managed_job_constants.JOBS_CONTROLLER_TEMPLATE, + vars_to_fill, + output_path=yaml_path) controller_task = task_lib.Task.from_yaml(yaml_path) controller_task.set_resources(controller_resources) - controller_task.spot_dag = dag - assert len(controller_task.resources) == 1 + controller_task.managed_job_dag = dag + assert len(controller_task.resources) == 1, controller_task sky_logging.print( f'{colorama.Fore.YELLOW}' - f'Launching managed spot job {dag.name!r} from spot controller...' + f'Launching managed job {dag.name!r} from jobs controller...' f'{colorama.Style.RESET_ALL}') - sky_logging.print('Launching spot controller...') + sky_logging.print('Launching jobs controller...') sky.launch(task=controller_task, stream_logs=stream_logs, cluster_name=controller_name, @@ -134,9 +138,9 @@ def launch( @usage_lib.entrypoint def queue(refresh: bool, skip_finished: bool = False) -> List[Dict[str, Any]]: # NOTE(dev): Keep the docstring consistent between the Python API and CLI. - """Get statuses of managed spot jobs. + """Get statuses of managed jobs. - Please refer to the sky.cli.spot_queue for the documentation. + Please refer to sky.cli.job_queue for documentation. Returns: [ @@ -148,23 +152,23 @@ def queue(refresh: bool, skip_finished: bool = False) -> List[Dict[str, Any]]: 'end_at': (float) timestamp of end, 'duration': (float) duration in seconds, 'recovery_count': (int) Number of retries, - 'status': (sky.spot.SpotStatus) of the job, + 'status': (sky.jobs.ManagedJobStatus) of the job, 'cluster_resources': (str) resources of the cluster, 'region': (str) region of the cluster, } ] Raises: - sky.exceptions.ClusterNotUpError: the spot controller is not up or + sky.exceptions.ClusterNotUpError: the jobs controller is not up or does not exist. - RuntimeError: if failed to get the spot jobs with ssh. + RuntimeError: if failed to get the managed jobs with ssh. """ + jobs_controller_type = controller_utils.Controllers.JOBS_CONTROLLER stopped_message = '' if not refresh: - stopped_message = 'No in-progress spot jobs.' + stopped_message = 'No in-progress managed jobs.' try: handle = backend_utils.is_controller_accessible( - controller_type=controller_utils.Controllers.SPOT_CONTROLLER, - stopped_message=stopped_message) + controller=jobs_controller_type, stopped_message=stopped_message) except exceptions.ClusterNotUpError as e: if not refresh: raise @@ -176,18 +180,19 @@ def queue(refresh: bool, skip_finished: bool = False) -> List[Dict[str, Any]]: 'Restarting controller for latest status...' f'{colorama.Style.RESET_ALL}') - rich_utils.force_update_status('[cyan] Checking spot jobs - restarting ' - 'controller[/]') - handle = sky.start(spot_utils.SPOT_CONTROLLER_NAME) + rich_utils.force_update_status( + '[cyan] Checking managed jobs - restarting ' + 'controller[/]') + handle = sky.start(jobs_controller_type.value.cluster_name) controller_status = status_lib.ClusterStatus.UP - rich_utils.force_update_status('[cyan] Checking spot jobs[/]') + rich_utils.force_update_status('[cyan] Checking managed jobs[/]') assert handle is not None, (controller_status, refresh) backend = backend_utils.get_backend_from_handle(handle) assert isinstance(backend, backends.CloudVmRayBackend) - code = spot_utils.SpotCodeGen.get_job_table() + code = managed_job_utils.ManagedJobCodeGen.get_job_table() returncode, job_table_payload, stderr = backend.run_on_head( handle, code, @@ -198,13 +203,13 @@ def queue(refresh: bool, skip_finished: bool = False) -> List[Dict[str, Any]]: try: subprocess_utils.handle_returncode(returncode, code, - 'Failed to fetch managed spot jobs', + 'Failed to fetch managed jobs', job_table_payload + stderr, stream_logs=False) except exceptions.CommandError as e: raise RuntimeError(str(e)) from e - jobs = spot_utils.load_spot_job_queue(job_table_payload) + jobs = managed_job_utils.load_managed_job_queue(job_table_payload) if skip_finished: # Filter out the finished jobs. If a multi-task job is partially # finished, we will include all its tasks. @@ -222,18 +227,18 @@ def cancel(name: Optional[str] = None, job_ids: Optional[List[int]] = None, all: bool = False) -> None: # NOTE(dev): Keep the docstring consistent between the Python API and CLI. - """Cancel managed spot jobs. + """Cancel managed jobs. - Please refer to the sky.cli.spot_cancel for the document. + Please refer to sky.cli.job_cancel for documentation. Raises: - sky.exceptions.ClusterNotUpError: the spot controller is not up. + sky.exceptions.ClusterNotUpError: the jobs controller is not up. RuntimeError: failed to cancel the job. """ job_ids = [] if job_ids is None else job_ids handle = backend_utils.is_controller_accessible( - controller_type=controller_utils.Controllers.SPOT_CONTROLLER, - stopped_message='All managed spot jobs should have finished.') + controller=controller_utils.Controllers.JOBS_CONTROLLER, + stopped_message='All managed jobs should have finished.') job_id_str = ','.join(map(str, job_ids)) if sum([len(job_ids) > 0, name is not None, all]) != 1: @@ -247,12 +252,12 @@ def cancel(name: Optional[str] = None, backend = backend_utils.get_backend_from_handle(handle) assert isinstance(backend, backends.CloudVmRayBackend) if all: - code = spot_utils.SpotCodeGen.cancel_jobs_by_id(None) + code = managed_job_utils.ManagedJobCodeGen.cancel_jobs_by_id(None) elif job_ids: - code = spot_utils.SpotCodeGen.cancel_jobs_by_id(job_ids) + code = managed_job_utils.ManagedJobCodeGen.cancel_jobs_by_id(job_ids) else: assert name is not None, (job_ids, name, all) - code = spot_utils.SpotCodeGen.cancel_job_by_name(name) + code = managed_job_utils.ManagedJobCodeGen.cancel_job_by_name(name) # The stderr is redirected to stdout returncode, stdout, _ = backend.run_on_head(handle, code, @@ -260,7 +265,7 @@ def cancel(name: Optional[str] = None, stream_logs=False) try: subprocess_utils.handle_returncode(returncode, code, - 'Failed to cancel managed spot job', + 'Failed to cancel managed job', stdout) except exceptions.CommandError as e: with ux_utils.print_exception_no_traceback(): @@ -276,42 +281,49 @@ def cancel(name: Optional[str] = None, @usage_lib.entrypoint def tail_logs(name: Optional[str], job_id: Optional[int], follow: bool) -> None: # NOTE(dev): Keep the docstring consistent between the Python API and CLI. - """Tail logs of managed spot jobs. + """Tail logs of managed jobs. - Please refer to the sky.cli.spot_logs for the document. + Please refer to sky.cli.job_logs for documentation. Raises: ValueError: invalid arguments. - sky.exceptions.ClusterNotUpError: the spot controller is not up. + sky.exceptions.ClusterNotUpError: the jobs controller is not up. """ - # TODO(zhwu): Automatically restart the spot controller + # TODO(zhwu): Automatically restart the jobs controller + jobs_controller_type = controller_utils.Controllers.JOBS_CONTROLLER handle = backend_utils.is_controller_accessible( - controller_type=controller_utils.Controllers.SPOT_CONTROLLER, - stopped_message=('Please restart the spot controller with ' - f'`sky start {spot_utils.SPOT_CONTROLLER_NAME}`.')) + controller=jobs_controller_type, + stopped_message=( + 'Please restart the jobs controller with ' + f'`sky start {jobs_controller_type.value.cluster_name}`.')) if name is not None and job_id is not None: raise ValueError('Cannot specify both name and job_id.') backend = backend_utils.get_backend_from_handle(handle) assert isinstance(backend, backends.CloudVmRayBackend), backend # Stream the realtime logs - backend.tail_spot_logs(handle, job_id=job_id, job_name=name, follow=follow) - - -spot_launch = common_utils.deprecated_function(launch, - name='sky.spot.launch', - deprecated_name='spot_launch', - removing_version='0.7.0') + backend.tail_managed_job_logs(handle, + job_id=job_id, + job_name=name, + follow=follow) + + +spot_launch = common_utils.deprecated_function( + launch, + name='sky.jobs.launch', + deprecated_name='spot_launch', + removing_version='0.8.0', + override_argument={'use_spot': True}) spot_queue = common_utils.deprecated_function(queue, - name='sky.spot.queue', + name='sky.jobs.queue', deprecated_name='spot_queue', - removing_version='0.7.0') + removing_version='0.8.0') spot_cancel = common_utils.deprecated_function(cancel, - name='sky.spot.cancel', + name='sky.jobs.cancel', deprecated_name='spot_cancel', - removing_version='0.7.0') + removing_version='0.8.0') spot_tail_logs = common_utils.deprecated_function( tail_logs, - name='sky.spot.tail_logs', + name='sky.jobs.tail_logs', deprecated_name='spot_tail_logs', - removing_version='0.7.0') + removing_version='0.8.0') diff --git a/sky/jobs/dashboard/dashboard.py b/sky/jobs/dashboard/dashboard.py new file mode 100644 index 00000000000..89c97274646 --- /dev/null +++ b/sky/jobs/dashboard/dashboard.py @@ -0,0 +1,87 @@ +"""Dashboard for managed jobs based on Flask. + +TODO(zongheng): This is a basic version. In the future we can beef up the web +frameworks used (e.g., +https://github.com/ray-project/ray/tree/master/dashboard/client/src) and/or get +rid of the SSH port-forwarding business (see cli.py's job_dashboard() +comment). +""" +import datetime +import pathlib + +import flask +import yaml + +from sky import jobs as managed_jobs +from sky.utils import common_utils +from sky.utils import controller_utils + +app = flask.Flask(__name__) + + +def _is_running_on_jobs_controller() -> bool: + """Am I running on jobs controller? + + Loads ~/.sky/sky_ray.yml and check cluster_name. + """ + if pathlib.Path('~/.sky/sky_ray.yml').expanduser().exists(): + config = yaml.safe_load( + pathlib.Path('~/.sky/sky_ray.yml').expanduser().read_text()) + cluster_name = config.get('cluster_name', '') + candidate_controller_names = ( + controller_utils.Controllers.JOBS_CONTROLLER.value. + candidate_cluster_names) + # We use startswith instead of exact match because the cluster name in + # the yaml file is cluster_name_on_cloud which may have additional + # suffices. + return any( + cluster_name.startswith(name) + for name in candidate_controller_names) + return False + + +@app.route('/') +def home(): + if not _is_running_on_jobs_controller(): + # Experimental: run on laptop (refresh is very slow). + all_managed_jobs = managed_jobs.queue(refresh=True, skip_finished=False) + else: + job_table = managed_jobs.dump_managed_job_queue() + all_managed_jobs = managed_jobs.load_managed_job_queue(job_table) + + timestamp = datetime.datetime.now(datetime.timezone.utc) + rows = managed_jobs.format_job_table(all_managed_jobs, + show_all=True, + return_rows=True) + # Add an empty column for the dropdown button. This will be added in the + # jobs/templates/index.html file. + rows = [[''] + row for row in rows] + + # FIXME(zongheng): make the job table/queue funcs return structured info so + # that we don't have to do things like row[-5] below. + columns = [ + '', 'ID', 'Task', 'Name', 'Resources', 'Submitted', 'Total Duration', + 'Job Duration', 'Recoveries', 'Status', 'Started', 'Cluster', 'Region', + 'Failure' + ] + if rows and len(rows[0]) != len(columns): + raise RuntimeError( + 'Dashboard code and managed job queue code are out of sync.') + + # Fix STATUS color codes: '\x1b[33mCANCELLED\x1b[0m' -> 'CANCELLED'. + for row in rows: + row[-5] = common_utils.remove_color(row[-5]) + # Remove filler rows ([''], ..., ['-']). + rows = [row for row in rows if ''.join(map(str, row)) != ''] + + rendered_html = flask.render_template( + 'index.html', + columns=columns, + rows=rows, + last_updated_timestamp=timestamp, + ) + return rendered_html + + +if __name__ == '__main__': + app.run() diff --git a/sky/spot/dashboard/static/favicon.ico b/sky/jobs/dashboard/static/favicon.ico similarity index 100% rename from sky/spot/dashboard/static/favicon.ico rename to sky/jobs/dashboard/static/favicon.ico diff --git a/sky/spot/dashboard/templates/index.html b/sky/jobs/dashboard/templates/index.html similarity index 50% rename from sky/spot/dashboard/templates/index.html rename to sky/jobs/dashboard/templates/index.html index f8267cd3e5f..af4f5708bce 100644 --- a/sky/spot/dashboard/templates/index.html +++ b/sky/jobs/dashboard/templates/index.html @@ -6,7 +6,8 @@ SkyPilot Dashboard - +