diff --git a/README.md b/README.md index 06bc02e..c805eeb 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,57 @@ # new-cloud-skeleton + +*ℹ️ NOTE: This guide will change as SkyPilot develops. Please check back often to make sure you have the most up-to-date version. The read me in this doc has been derived from the [Google Doc](https://docs.google.com/document/d/1iuPyQ47HloKuHfOYjcRNz7HAlUxFHs2WjMqedYmcPVQ/edit#heading=h.nby65cfuzxoq)* + Skeleton repo for what's needed to add a new cloud. Detailed instructions upcoming! Contact the dev team on [SkyPilot Slack](https://slack.skypilot.co/) to get access. + +## Introduction to SkyPilot + +[SkyPilot](https://github.com/skypilot-org/skypilot) is an intercloud broker -- a framework for running workloads on any cloud. Here are some useful links to learn more: + +1. [Introductory Blogpost](https://medium.com/@zongheng_yang/skypilot-ml-and-data-science-on-any-cloud-with-massive-cost-savings-244189cc7c0f) [Start here if you are new] +2. [Documentation](https://skypilot.readthedocs.io/en/latest/) +3. [The Sky Above the Clouds](https://arxiv.org/abs/2205.07147) +4. [GitHub](https://github.com/skypilot-org/skypilot) + +## How does SkyPilot work? + +Here's a simplified overview of SkyPilot's architecture. + +TODO: diagram in google doc + +In this diagram, the user has two clouds enabled (AWS and GCP). This is what happens when a user launches a job with sky launch: + +1. The optimizer reads AWS Catalog and GCP Catalog and runs an algorithm to decide which cloud to run the job on. (Let's suppose the optimizer chooses AWS.) This information is then sent to the provisioner+executor. + - A catalog is a list of instance types and their prices. +2. The provisioner+executor executes ray commands to launch a cluster on AWS. + - AWS Node Provider is the interface between ray and AWS, translating ray function calls to AWS API calls. +3. Once the cluster is launched, the provisioner+executor ssh’s into the cluster to execute some AWS Setup commands. This is used to download some important packages on the cluster. +4. The provisioner+executor submits the job to the cluster and the cluster runs the job. + +When all is done, the user can run sky down and provisioner+executor will tear down the cluster by executing more ray commands. + +## Getting Started + +Now let's say you have a new cloud, called FluffyCloud, that you want SkyPilot to support. What do you need to do? + +You need to: + +1. Write a NodeProvider for FluffyCloud. This is the most important part. +2. Add the FluffyCloud catalog to SkyPilot and write functions that read this catalog. +3. Write FluffyCloud setup code. +4. Add FluffyCloud credential check to verify locally stored credentials. This is needed for a user to enable FluffyCloud. + +For reference, here is an actual merged PR for adding a new cloud to help you estimate what is required: + +- [Lambda Cloud](https://github.com/skypilot-org/skypilot/pull/1557) + +By completing the following steps, you will be able to run SkyPilot on FluffyCloud. + +- [Step 0](/docs/integration_steps/step_0-api-library.md) +- [Step 1](/docs/integration_steps/step_1-node-provider.md) +- [Step 2](/docs/integration_steps/step_2-catalog.md) +- [Step 3](/docs/integration_steps/step_3-setup-code.md) +- [Step 4](/docs/integration_steps/step_4-setup-code.md.md) +- [Step 5](/docs/integration_steps/step_5-e2e-failover.md) diff --git a/docs/integration_steps/step_0-api-library.md b/docs/integration_steps/step_0-api-library.md new file mode 100644 index 0000000..584b960 --- /dev/null +++ b/docs/integration_steps/step_0-api-library.md @@ -0,0 +1,14 @@ +# Cloud Python Library Coverage + +This document is primarily for the mantainers of python libaries for their perspective cloud. It provides a list of functions that SkyPilot will need, by integrating these functions into your python library you can drastically reduce the amount of work and complexity needed to add your cloud to SkyPilot. + +## Function List + +1. Launch an instance +2. Remove an instance +3. Set instance tags +4. List instances + +## API Wrapper/Middleware + +It is likely that you will require a wrapper to return the output from your python library into the format required by SkyPilot. diff --git a/docs/integration_steps/step_1-node-provider.md b/docs/integration_steps/step_1-node-provider.md new file mode 100644 index 0000000..7ef4d47 --- /dev/null +++ b/docs/integration_steps/step_1-node-provider.md @@ -0,0 +1,15 @@ +# Node Provider + +NodeProvider is the interface that ray uses to interact with cloud providers. First, you should read through the NodeProvider class definition [here](https://github.com/ray-project/ray/blob/master/python/ray/autoscaler/node_provider.py). The docstrings give a good idea of what the NodeProvider class is. + +## Implementing a Node Provider + +1. Create the directory `sky/skylet/providers/{cloud_name}` +2. Add `__init__.py` to the directory and add the following code: + + ```python + from sky.skylet.providers.{cloud_name}.node_provider import {CloudName}NodeProvider + ``` + +3. Copy the `node_provider.py` template into the directory. +4. Complete the template. The template has comments to guide you through the process. diff --git a/docs/integration_steps/step_2-catalog.md b/docs/integration_steps/step_2-catalog.md new file mode 100644 index 0000000..e2b1756 --- /dev/null +++ b/docs/integration_steps/step_2-catalog.md @@ -0,0 +1,21 @@ +# Cloud Catalog + +A catalog is a CSV file under [SkyPilot Catalogs](https://github.com/skypilot-org/skypilot-catalog) + +| Field | Type | Description | +|--------------------|--------|-------------------------------------------------------------------------------------| +| `InstanceType` | string | The type of instance. | +| `vCPUs` | float | The number of virtual CPUs. | +| `MemoryGiB` | float | The amount of memory in GB. | +| `AcceleratorName` | string | The name of accelerators (GPU/TPU). | +| `AcceleratorCount` | float | The number of accelerators (GPU/TPU). | +| `GPUInfo` | string | The human readable information of the GPU (not used in code). | +| `Region` | string | The region of the resource. | +| `AvailabilityZone` | string | The availability zone of the resource (can be empty if not supported in the cloud). | +| `Price` | float | The price of the resource. | +| `SpotPrice` | float | The spot price of the resource. | + + +## Parsing Catalog + +Create a copy of `fluffycloud_catalog.py` and place it at `sky/clouds/service_catalog/{cloudname_catalog}.py`. Aside from renaming fluffly cloud to your cloud name, you do not need to make additional changes. diff --git a/docs/integration_steps/step_3-cloud-class.md b/docs/integration_steps/step_3-cloud-class.md new file mode 100644 index 0000000..6f0bc29 --- /dev/null +++ b/docs/integration_steps/step_3-cloud-class.md @@ -0,0 +1,9 @@ +# Cloud Class + +This class calls some service catalog functions and contains code that checks FluffyCloud credentials. Many of the functions in this class are also straightforward to implement. + +Start by creating a copy of `fluffycloud/fluffycloud.py` and place it at `sky/clouds/fluffycloud.py` + +## Credentials file + +The credentials file contains the users credentials required to access your cloud. You can specify the location of files to check for credentials by adding them to the `_CREDENTIAL_FILES` list. diff --git a/docs/integration_steps/step_4-setup-code.md b/docs/integration_steps/step_4-setup-code.md new file mode 100644 index 0000000..9c0708d --- /dev/null +++ b/docs/integration_steps/step_4-setup-code.md @@ -0,0 +1,16 @@ +# Setup Code + +This code is executed after a cluster is launched (via ssh). Most of this code is very similar to the existing setup code for other clouds, and may almost be a copy-paste. + +Create a copy of `fluffycloud-ray.yml.js` and place it at `sky/templates/-ray.yml.j2` + +## Ray Backend + +Open `sky/backends/cloud_vm_ray_backend.py` and edit the `_get_cluster_config_template` function to include the new cloud. + +Open `sky/backends/cloud_vm_ray_backend.py` and edit the `_add_auth_to_cluster_config` function to include the new cloud. + + +### Authentication + +Cloud authentication is handled by `sky/authentication.py`. The `setup__authentication` functions will be called on every cluster provisioning request. diff --git a/docs/integration_steps/step_5-e2e-failover.md b/docs/integration_steps/step_5-e2e-failover.md new file mode 100644 index 0000000..601e981 --- /dev/null +++ b/docs/integration_steps/step_5-e2e-failover.md @@ -0,0 +1,52 @@ +# Cloud E2E Failover + +Copy the following functions into sky/backends/cloud_vm_ray_backend.py be sure to update your \ + +```python + def _update_blocklist_on__error( + self, launchable_resources: 'resources_lib.Resources', + region: 'clouds.Region', zones: Optional[List['clouds.Zone']], + stdout: str, stderr: str): + del zones # Unused. + style = colorama.Style + stdout_splits = stdout.split('\n') + stderr_splits = stderr.split('\n') + errors = [ + s.strip() + for s in stdout_splits + stderr_splits + if 'Error:' in s.strip() + ] + if not errors: + logger.info('====== stdout ======') + for s in stdout_splits: + print(s) + logger.info('====== stderr ======') + for s in stderr_splits: + print(s) + with ux_utils.print_exception_no_traceback(): + raise RuntimeError('Errors occurred during provision; ' + 'check logs above.') + + logger.warning(f'Got error(s) in {region.name}:') + messages = '\n\t'.join(errors) + logger.warning(f'{style.DIM}\t{messages}{style.RESET_ALL}') + # NOTE: you can check out other clouds' implementations of this function, + # which may intelligently block a whole zone / whole region depending on + # the errors thrown. + self._blocked_resources.add(launchable_resources.copy(zone=None)) +``` + +Within the `_update_blocklist_on_error` function add your cloud to the handlers dictionary + +```python +def _update_blocklist_on_error( + ... + handlers = { + ... + # TODO Add this + clouds.FluffyCloud: self._update_blocklist_on_fluffycloud_error, + ... + } + ... +... +``` diff --git a/fluffycloud/fluffycloud.py b/fluffycloud/fluffycloud.py index c10e706..25da13f 100644 --- a/fluffycloud/fluffycloud.py +++ b/fluffycloud/fluffycloud.py @@ -3,12 +3,15 @@ from typing import Dict, Iterator, List, Optional, Tuple from sky import clouds +from sky import status_lib from sky.clouds import service_catalog if typing.TYPE_CHECKING: # Renaming to avoid shadowing variables. from sky import resources as resources_lib +import fluffycloud_api as fc_api + _CREDENTIAL_FILES = [ # credential files for FluffyCloud, ] @@ -21,11 +24,12 @@ class FluffyCloud(clouds.Cloud): _CLOUD_UNSUPPORTED_FEATURES = { clouds.CloudImplementationFeatures.STOP: 'FluffyCloud does not support stopping VMs.', clouds.CloudImplementationFeatures.AUTOSTOP: 'FluffyCloud does not support stopping VMs.', - clouds.CloudImplementationFeatures.MULTI_NODE: 'Multi-node is not supported by the FluffyCloud implementation yet.', + clouds.CloudImplementationFeatures.MULTI_NODE: 'Multi-node is not supported by the FluffyCloud implementation yet.' + } ######## # TODO # ######## - _MAX_CLUSTER_NAME_LEN_LIMIT = # TODO + _MAX_CLUSTER_NAME_LEN_LIMIT = # TODO _regions: List[clouds.Region] = [] @@ -38,7 +42,6 @@ def _cloud_unsupported_features( def _max_cluster_name_length(cls) -> Optional[int]: return cls._MAX_CLUSTER_NAME_LEN_LIMIT - @classmethod def regions(cls) -> List[clouds.Region]: if not cls._regions: @@ -71,6 +74,14 @@ def regions_with_offering(cls, instance_type: Optional[str], regions = [r for r in regions if r.name == region] return regions + @classmethod + def get_vcpus_mem_from_instance_type( + cls, + instance_type: str, + ) -> Tuple[Optional[float], Optional[float]]: + # FILL_IN: cloudname + return service_catalog.get_vcpus_mem_from_instance_type(instance_type, clouds='') + @classmethod def zones_provision_loop( cls, @@ -108,12 +119,9 @@ def accelerators_to_hourly_cost(self, region: Optional[str] = None, zone: Optional[str] = None) -> float: del accelerators, use_spot, region, zone # unused - ######## - # TODO # - ######## - # This function assumes accelerators are included as part of instance - # type. If not, you will need to change this. (However, you can do - # this later; `return 0.0` is a good placeholder.) + # FILL_IN: If accelerator costs are not included in instance_type cost, + # return the cost of the accelerators here. If accelerators are + # included in instance_type cost, return 0.0. return 0.0 def get_egress_cost(self, num_gigabytes: float) -> float: @@ -132,10 +140,8 @@ def is_same_cloud(self, other: clouds.Cloud) -> bool: return isinstance(other, FluffyCloud) @classmethod - def get_default_instance_type(cls, - cpus: Optional[str] = None) -> Optional[str]: - return service_catalog.get_default_instance_type(cpus=cpus, - clouds='fluffycloud') + def get_default_instance_type(cls, cpus: Optional[str] = None) -> Optional[str]: + return service_catalog.get_default_instance_type(cpus=cpus, clouds='fluffycloud') @classmethod def get_accelerators_from_instance_type( @@ -178,8 +184,8 @@ def make_deploy_resources_variables( 'region': region.name, } - def get_feasible_launchable_resources(self, - resources: 'resources_lib.Resources'): + def _get_feasible_launchable_resources(self, + resources: 'resources_lib.Resources'): if resources.use_spot: return ([], []) if resources.instance_type is not None: @@ -218,7 +224,7 @@ def _make(instance_list): assert len(accelerators) == 1, resources acc, acc_count = list(accelerators.items())[0] (instance_list, fuzzy_candidate_list - ) = service_catalog.get_instance_type_for_accelerator( + ) = service_catalog.get_instance_type_for_accelerator( acc, acc_count, use_spot=resources.use_spot, @@ -267,3 +273,33 @@ def accelerator_in_region_or_zone(self, zone: Optional[str] = None) -> bool: return service_catalog.accelerator_in_region_or_zone( accelerator, acc_count, region, zone, 'fluffycloud') + + @classmethod + def query_status(cls, name: str, tag_filters: Dict[str, str], + region: Optional[str], zone: Optional[str], + **kwargs) -> List[status_lib.ClusterStatus]: + del tag_filters, region, zone, kwargs # Unused. + + # FILL_IN: For the status map, map the FluffyCloud status to the SkyPilot status. + # SkyPilot status is defined in sky/status_lib.py + # Example: status_map = {'CREATING': status_lib.ClusterStatus.INIT, ...} + # The keys are the FluffyCloud status, and the values are the SkyPilot status. + status_map = { + 'CREATING': status_lib.ClusterStatus.INIT, + 'EDITING': status_lib.ClusterStatus.INIT, + 'RUNNING': status_lib.ClusterStatus.UP, + 'STARTING': status_lib.ClusterStatus.INIT, + 'RESTARTING': status_lib.ClusterStatus.INIT, + 'STOPPING': status_lib.ClusterStatus.STOPPED, + 'STOPPED': status_lib.ClusterStatus.STOPPED, + 'TERMINATING': None, + 'TERMINATED': None, + } + status_list = [] + vms = fc_api.list_instances() + for node in vms: + if node['name'] == name: + node_status = status_map[node['status']] + if node_status is not None: + status_list.append(node_status) + return status_list diff --git a/fluffycloud/fluffycloud_api.py b/fluffycloud/fluffycloud_api.py index 9fae628..9906642 100644 --- a/fluffycloud/fluffycloud_api.py +++ b/fluffycloud/fluffycloud_api.py @@ -1,27 +1,35 @@ -def launch(name:str, - instance_type:str, - region:str, - api_key:str, - ssh_key_name:str): +from typing import Dict + + +def launch(name: str, + instance_type: str, + region: str, + api_key: str, + ssh_key_name: str): """Launches an INSTANCE_TYPE instance in region REGION with given NAME. - + The instance_type refers to the type found in the catalog. + + API_KEY is a secret registered with FluffyCloud. It is per-user. - + SSH_KEY_NAME corresponds to a ssh key registered with FluffyCloud. After launching, the user can ssh into INSTANCE_TYPE with that ssh key. - + Returns INSTANCE_ID if successful, otherwise returns None. """ -def remove(instance_id:str, api_key:str): + +def remove(instance_id: str, api_key: str): """Removes instance with given INSTANCE_ID.""" - -def set_tags(instance_id:str, tags:Dict, api_key:str) + + +def set_tags(instance_id: str, tags: Dict, api_key: str): """Set tags for instance with given INSTANCE_ID.""" - -def list_instances(api_key:str): + + +def list_instances(api_key: str): """Lists instances associated with API_KEY. - + Returns a dictionary: { instance_id_1: diff --git a/fluffycloud/fluffycloud_catalog.py b/fluffycloud/fluffycloud_catalog.py index 8011970..5e53478 100644 --- a/fluffycloud/fluffycloud_catalog.py +++ b/fluffycloud/fluffycloud_catalog.py @@ -20,7 +20,7 @@ def validate_region_zone( if zone is not None: with ux_utils.print_exception_no_traceback(): raise ValueError('FluffyCloud does not support zones.') - return common.validate_region_zone_impl(_df, region, zone) + return common.validate_region_zone_impl('', _df, region, zone) # FILL_IN: cloudname def accelerator_in_region_or_zone(acc_name: str, @@ -47,8 +47,9 @@ def get_hourly_cost(instance_type: str, zone) -def get_vcpus_from_instance_type(instance_type: str) -> Optional[float]: - return common.get_vcpus_from_instance_type_impl(_df, instance_type) +def get_vcpus_mem_from_instance_type( + instance_type: str) -> Tuple[Optional[float], Optional[float]]: + return common.get_vcpus_mem_from_instance_type_impl(_df, instance_type) def get_default_instance_type(cpus: Optional[str] = None) -> Optional[str]: @@ -66,6 +67,7 @@ def get_instance_type_for_accelerator( acc_name: str, acc_count: int, cpus: Optional[str] = None, + memory: Optional[str] = None, use_spot: bool = False, region: Optional[str] = None, zone: Optional[str] = None) -> Tuple[Optional[List[str]], List[str]]: @@ -80,6 +82,7 @@ def get_instance_type_for_accelerator( acc_name=acc_name, acc_count=acc_count, cpus=cpus, + memory=memory, use_spot=use_spot, region=region, zone=zone) @@ -95,8 +98,9 @@ def list_accelerators( gpus_only: bool, name_filter: Optional[str], region_filter: Optional[str], + quantity_filter: Optional[int], case_sensitive: bool = True ) -> Dict[str, List[common.InstanceTypeInfo]]: """Returns all instance types in FluffyCloud offering GPUs.""" - return common.list_accelerators_impl('FluffyCloud', _df, gpus_only, name_filter, - case_sensitive) + return common.list_accelerators_impl( + 'FluffyCloud', _df, gpus_only, name_filter, region_filter, quantity_filter, case_sensitive) diff --git a/fluffycloud/node_provider.py b/fluffycloud/node_provider.py index de89edb..158b09a 100644 --- a/fluffycloud/node_provider.py +++ b/fluffycloud/node_provider.py @@ -1,9 +1,27 @@ +""" +This file should be placed at sky/skylet/providers/{cloudname}/node_provider.py + +Methods that start with an underscore (_) are specific to SkyPilot, +and not part of the ray NodeProvider class definition. + +Template Usage: + - Replace all the FILL_INs with your own code. +""" + import logging +from typing import Any, Dict, List, Optional + from threading import RLock from ray.autoscaler.node_provider import NodeProvider from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME + +# FILL_IN: Import your python cloud library here. +# Replace library usage found in this template. +# A wrapper/helper is recommended to keep the code clean. +import fluffycloud_api as fc_api + logger = logging.getLogger(__name__) @@ -11,11 +29,11 @@ class FluffyCloudError(Exception): pass -def synchronized(f): +def synchronized(func): def wrapper(self, *args, **kwargs): self.lock.acquire() try: - return f(self, *args, **kwargs) + return func(self, *args, **kwargs) finally: self.lock.release() @@ -27,104 +45,135 @@ class FluffyCloudNodeProvider(NodeProvider): def __init__(self, provider_config, cluster_name): NodeProvider.__init__(self, provider_config, cluster_name) + self.lock = RLock() self.cached_nodes = {} - - ######## - # TODO # - ######## - # Load credentials - self.api_key = # TODO Read from credentials file - self.ssh_key_name = # TODO Read from credentials file - @synchronized - def _get_filtered_nodes(self, tag_filters): - running_instances = list_instances(self.api_key) - - ######## - # TODO # - ######## - self.cached_nodes = # TODO Filter running instances by tag_filters - - return self.cached_nodes + # FILL_IN: Load credentials if needed for your cloud library. + self.api_key = # FILL_IN: Read from credentials file + self.ssh_key_name = # FILL_IN: Read from credentials file - def non_terminated_nodes(self, tag_filters): + def non_terminated_nodes(self, tag_filters: Dict[str, str]) -> List[str]: """Return a list of node ids filtered by the specified tags dict. - + This list must not include terminated nodes. For performance reasons, providers are allowed to cache the result of a call to non_terminated_nodes() to serve single-node queries - (e.g. is_running(node_id)). This means that non_terminated_nodes() + (e.g. is_running(node_id)). This means that non_terminated_nodes() must be called again to refresh results. """ nodes = self._get_filtered_nodes(tag_filters=tag_filters) - return [k for k, _ in nodes.items()] - def is_running(self, node_id): + # TEMPLATE ACTION: Filter out terminated nodes + + return [node_id for node_id, _ in nodes.items()] + + def is_running(self, node_id: str) -> bool: """Return whether the specified node is running.""" return self._get_cached_node(node_id=node_id) is not None - def is_terminated(self, node_id): + def is_terminated(self, node_id: str) -> bool: """Return whether the specified node is terminated.""" return self._get_cached_node(node_id=node_id) is None - def node_tags(self, node_id): + def node_tags(self, node_id: str) -> Dict[str, str]: """Returns the tags of the given node (string dict).""" return self._get_cached_node(node_id=node_id)['tags'] - def external_ip(self, node_id): + def external_ip(self, node_id: str) -> str: """Returns the external ip of the given node.""" return self._get_cached_node(node_id=node_id)['ip'] - def internal_ip(self, node_id): + def internal_ip(self, node_id: str) -> str: """Returns the internal ip (Ray ip) of the given node.""" return self._get_cached_node(node_id=node_id)['ip'] - def create_node(self, node_config, tags, count): + def create_node(self, node_config: Dict[str, Any], tags: Dict[str, str], count: int) -> Optional[Dict[str, Any]]: """Creates a number of nodes within the namespace.""" - assert count == 1, count # Only support 1-node clusters for now - # Get the tags config_tags = node_config.get('tags', {}).copy() config_tags.update(tags) config_tags[TAG_RAY_CLUSTER_NAME] = self.cluster_name - # Create node + # Create nodes ttype = node_config['InstanceType'] region = self.provider_config['region'] - vm_id = launch(name=self.cluster_name, - instance_type=ttype, - region=region, - api_key=self.api_key - ssh_key_name=self.ssh_key_name) - - if vm_id is None: + + for _ in range(count): + instance_id = fc_api.launch(name=self.cluster_name, + instance_type=ttype, + region=region, + api_key=self.api_key, + ssh_key_name=self.ssh_key_name) + + if instance_id is None: raise FluffyCloudError('Failed to launch instance.') - set_tags(vm_id, config_tags, self.api_key) + fc_api.set_tags(instance_id, config_tags, self.api_key) - ######## - # TODO # - ######## - # May need to poll list_instances() to wait for booting - # to finish before returning. + # FILL_IN: Only return after all nodes are booted. + # If needed poll fc_api.list_instances() to wait for status == 'running' @synchronized - def set_node_tags(self, node_id, tags): + def set_node_tags(self, node_id: str, tags: Dict[str, str]) -> None: """Sets the tag values (string dict) for the specified node.""" node = self._get_node(node_id) node['tags'].update(tags) - set_tags(vm_id, node['tags'], self.api_key) + fc_api.set_tags(node_id, node['tags'], self.api_key) # FILL_IN - def terminate_node(self, node_id): + def terminate_node(self, node_id: str) -> Optional[Dict[str, Any]]: """Terminates the specified node.""" - remove(node_id, self.api_key) + fc_api.remove(node_id, self.api_key) # FILL_IN + + @synchronized + def _get_filtered_nodes(self, tag_filters: Dict[str, str]) -> Dict[str, Any]: + """ + SkyPilot Method + Caches the nodes with the given tag_filters. + + Return Example: + { + instance_id_1: { + status: ..., + tags: ..., + name: ..., + ip: .... + }, + instance_id_2: {...}, + ... + } + + Each instance needs to have a dictionary with the following keys: + - status: str + - tags: Dict[str, str] + - name: str + - ip: str + """ + instances = fc_api.list_instances(self.api_key) # FILL_IN + + new_cache = {} + for instance_id, instance in instances.items(): + if instance['status'] != 'running': + continue + if any(tag in instance['tags'] for tag in tag_filters): + new_cache[instance_id] = instance - def _get_node(self, node_id): + self.cached_nodes = new_cache + return self.cached_nodes + + def _get_node(self, node_id: str): + """ + SkyPilot Method + Returns the node with the given node_id, if it exists. + """ self._get_filtered_nodes({}) # Side effect: updates cache return self.cached_nodes.get(node_id, None) def _get_cached_node(self, node_id): + """ + SkyPilot Method + Returns the node with the given node_id, if it is cached. + """ if node_id in self.cached_nodes: return self.cached_nodes[node_id] return self._get_node(node_id=node_id)