Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GCP] Refactor the reserved instances cache #2836

Merged
merged 6 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,7 @@ def launch(
and they undergo job queue scheduling.
"""
# NOTE(dev): Keep the docstring consistent between the Python API and CLI.
logger.info('Launching cluster...')
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
env = _merge_env_vars(env_file, env)
controller_utils.check_cluster_name_not_controller(
cluster, operation_str='Launching tasks on it')
Expand Down
178 changes: 24 additions & 154 deletions sky/clouds/gcp.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
"""Google Cloud Platform."""
import dataclasses
import datetime
import functools
import json
import os
import re
import subprocess
import time
import typing
from typing import Any, Dict, Iterator, List, Optional, Set, Tuple
from typing import Dict, Iterator, List, Optional, Set, Tuple

import cachetools
import colorama

from sky import clouds
Expand All @@ -19,6 +16,7 @@
from sky import status_lib
from sky.adaptors import gcp
from sky.clouds import service_catalog
from sky.clouds.utils import gcp_utils
from sky.skylet import log_lib
from sky.utils import common_utils
from sky.utils import subprocess_utils
Expand Down Expand Up @@ -120,68 +118,6 @@ def is_api_disabled(endpoint: str, project_id: str) -> bool:
return proc.returncode != 0


@dataclasses.dataclass
class SpecificReservation:
count: int
in_use_count: int

@classmethod
def from_dict(cls, d: dict) -> 'SpecificReservation':
return cls(count=int(d['count']), in_use_count=int(d['inUseCount']))


class GCPReservation:
"""GCP Reservation object that contains the reservation information."""

def __init__(self, self_link: str, zone: str,
specific_reservation: SpecificReservation,
specific_reservation_required: bool) -> None:
self.self_link = self_link
self.zone = zone
self.specific_reservation = specific_reservation
self.specific_reservation_required = specific_reservation_required

@classmethod
def from_dict(cls, d: dict) -> 'GCPReservation':
return cls(
self_link=d['selfLink'],
zone=d['zone'],
specific_reservation=SpecificReservation.from_dict(
d['specificReservation']),
specific_reservation_required=d['specificReservationRequired'],
)

@property
def available_resources(self) -> int:
"""Count resources available that can be used in this reservation."""
return (self.specific_reservation.count -
self.specific_reservation.in_use_count)

def is_consumable(
self,
specific_reservations: Set[str],
) -> bool:
"""Check if the reservation is consumable.

Check if the reservation is consumable with the provided specific
reservation names. This is defined by the Consumption type.
For more details:
https://cloud.google.com/compute/docs/instances/reservations-overview#how-reservations-work
"""
return (not self.specific_reservation_required or
self.name in specific_reservations)

@property
def name(self) -> str:
"""Name derived from reservation self link.

The naming convention can be found here:
https://cloud.google.com/compute/docs/instances/reservations-consume#consuming_a_specific_shared_reservation
"""
parts = self.self_link.split('/')
return '/'.join(parts[-6:-4] + parts[-2:])


@clouds.CLOUD_REGISTRY.register
class GCP(clouds.Cloud):
"""Google Cloud Platform."""
Expand Down Expand Up @@ -225,11 +161,6 @@ class GCP(clouds.Cloud):
'https://skypilot.readthedocs.io/en/latest/getting-started/installation.html#google-cloud-platform-gcp' # pylint: disable=line-too-long
)

def __init__(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May need to test back-compat.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tested excuting new job and launching again on existing cluster. It works correctly. Added the tests in the PR description. Thanks!

super().__init__()

self._list_reservations_cache = None

@classmethod
def _cloud_unsupported_features(
cls) -> Dict[clouds.CloudImplementationFeatures, str]:
Expand Down Expand Up @@ -633,82 +564,6 @@ def get_vcpus_mem_from_instance_type(
return service_catalog.get_vcpus_mem_from_instance_type(instance_type,
clouds='gcp')

def get_reservations_available_resources(
self,
instance_type: str,
region: str,
zone: Optional[str],
specific_reservations: Set[str],
) -> Dict[str, int]:
del region # Unused
if zone is None:
# For backward compatibility, the cluster in INIT state launched
# before #2352 may not have zone information. In this case, we
# return 0 for all reservations.
return {reservation: 0 for reservation in specific_reservations}
reservations = self._list_reservations_for_instance_type_in_zone(
instance_type, zone)

return {
r.name: r.available_resources
for r in reservations
if r.is_consumable(specific_reservations)
}

def _list_reservations_for_instance_type_in_zone(
self,
instance_type: str,
zone: str,
) -> List[GCPReservation]:
reservations = self._list_reservations_for_instance_type(instance_type)
return [r for r in reservations if r.zone.endswith(f'/{zone}')]

def _get_or_create_ttl_cache(self):
if getattr(self, '_list_reservations_cache', None) is None:
# Backward compatibility: Clusters created before #2352 has GCP
# objects serialized without the attribute. So we access it this
# way.
self._list_reservations_cache = cachetools.TTLCache(
maxsize=1,
ttl=datetime.timedelta(300),
timer=datetime.datetime.now)
return self._list_reservations_cache

@cachetools.cachedmethod(cache=lambda self: self._get_or_create_ttl_cache())
def _list_reservations_for_instance_type(
self,
instance_type: str,
) -> List[GCPReservation]:
"""List all reservations for the given instance type.

TODO: We need to incorporate accelerators because the reserved instance
can be consumed only when the instance_type + GPU type matches, and in
GCP GPUs except for A100 and L4 do not have their own instance type.
For example, if we have a specific reservation with n1-highmem-8
in us-central1-c. `sky launch --gpus V100` will fail.
"""
list_reservations_cmd = (
'gcloud compute reservations list '
f'--filter="specificReservation.instanceProperties.machineType={instance_type} AND status=READY" '
'--format="json(specificReservation.count, specificReservation.inUseCount, specificReservationRequired, selfLink, zone)"'
)
returncode, stdout, stderr = subprocess_utils.run_with_retries(
list_reservations_cmd,
# 1: means connection aborted (although it shows 22 in the error,
# but the actual error code is 1)
# Example: ERROR: gcloud crashed (ConnectionError): ('Connection aborted.', OSError(22, 'Invalid argument')) # pylint: disable=line-too-long
retry_returncode=[255, 1],
)
subprocess_utils.handle_returncode(
returncode,
list_reservations_cmd,
error_msg=
f'Failed to get list reservations for {instance_type!r}:\n{stderr}',
stderr=stderr,
stream_logs=True,
)
return [GCPReservation.from_dict(r) for r in json.loads(stdout)]

@classmethod
def _find_application_key_path(cls) -> str:
# Check the application default credentials in the environment variable.
Expand Down Expand Up @@ -1058,6 +913,28 @@ def check_quota_available(cls, resources: 'resources.Resources') -> bool:
# Quota found to be greater than zero, try provisioning
return True

def get_reservations_available_resources(
self,
instance_type: str,
region: str,
zone: Optional[str],
specific_reservations: Set[str],
) -> Dict[str, int]:
del region # Unused
if zone is None:
# For backward compatibility, the cluster in INIT state launched
# before #2352 may not have zone information. In this case, we
# return 0 for all reservations.
return {reservation: 0 for reservation in specific_reservations}
reservations = gcp_utils.list_reservations_for_instance_type_in_zone(
instance_type, zone)

return {
r.name: r.available_resources
for r in reservations
if r.is_consumable(specific_reservations)
}

@classmethod
def query_status(cls, name: str, tag_filters: Dict[str, str],
region: Optional[str], zone: Optional[str],
Expand Down Expand Up @@ -1227,10 +1104,3 @@ def delete_image(cls, image_id: str, region: Optional[str]) -> None:
error_msg=f'Failed to delete image {image_name!r}',
stderr=stderr,
stream_logs=True)

def __getstate__(self) -> Dict[str, Any]:
state = self.__dict__.copy()
# We should avoid saving third-party object to the state, as it may
# cause unpickling error when the third-party API is updated.
state.pop('_list_reservations_cache', None)
return state
117 changes: 117 additions & 0 deletions sky/clouds/utils/gcp_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""Utility functions for GCP."""
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved

import dataclasses
import json
import time
from typing import List, Set

import cachetools

from sky.utils import subprocess_utils


@dataclasses.dataclass
class SpecificReservation:
count: int
in_use_count: int

@classmethod
def from_dict(cls, d: dict) -> 'SpecificReservation':
return cls(count=int(d['count']), in_use_count=int(d['inUseCount']))


class GCPReservation:
"""GCP Reservation object that contains the reservation information."""

def __init__(self, self_link: str, zone: str,
specific_reservation: SpecificReservation,
specific_reservation_required: bool) -> None:
self.self_link = self_link
self.zone = zone
self.specific_reservation = specific_reservation
self.specific_reservation_required = specific_reservation_required

@classmethod
def from_dict(cls, d: dict) -> 'GCPReservation':
return cls(
self_link=d['selfLink'],
zone=d['zone'],
specific_reservation=SpecificReservation.from_dict(
d['specificReservation']),
specific_reservation_required=d['specificReservationRequired'],
)

@property
def available_resources(self) -> int:
"""Count resources available that can be used in this reservation."""
return (self.specific_reservation.count -
self.specific_reservation.in_use_count)

def is_consumable(
self,
specific_reservations: Set[str],
) -> bool:
"""Check if the reservation is consumable.

Check if the reservation is consumable with the provided specific
reservation names. This is defined by the Consumption type.
For more details:
https://cloud.google.com/compute/docs/instances/reservations-overview#how-reservations-work
"""
return (not self.specific_reservation_required or
self.name in specific_reservations)

@property
def name(self) -> str:
"""Name derived from reservation self link.

The naming convention can be found here:
https://cloud.google.com/compute/docs/instances/reservations-consume#consuming_a_specific_shared_reservation
"""
parts = self.self_link.split('/')
return '/'.join(parts[-6:-4] + parts[-2:])


def list_reservations_for_instance_type_in_zone(
instance_type: str,
zone: str,
) -> List[GCPReservation]:
reservations = _list_reservations_for_instance_type(instance_type)
return [r for r in reservations if r.zone.endswith(f'/{zone}')]


@cachetools.cached(cache=cachetools.TTLCache(maxsize=1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any quick test on whether the cache is effective within a process? Asking because the args passed are slightly different than before.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I added a logging in L104 and tried to remove the decorator. With the decorator the logging will only be shown once, but without the decorator it will show the output multiple times.

Also, tried to change the ttl to 0.3, and it shows the output three times during the optimization.

ttl=300,
timer=time.time))
def _list_reservations_for_instance_type(
instance_type: str,) -> List[GCPReservation]:
"""List all reservations for the given instance type.

TODO: We need to incorporate accelerators because the reserved instance
can be consumed only when the instance_type + GPU type matches, and in
GCP GPUs except for A100 and L4 do not have their own instance type.
For example, if we have a specific reservation with n1-highmem-8
in us-central1-c. `sky launch --gpus V100` will fail.
"""
list_reservations_cmd = (
'gcloud compute reservations list '
'--filter="specificReservation.instanceProperties.machineType='
f'{instance_type} AND status=READY" --format="json('
'specificReservation.count, specificReservation.inUseCount, '
'specificReservationRequired, selfLink, zone)"')
returncode, stdout, stderr = subprocess_utils.run_with_retries(
list_reservations_cmd,
# 1: means connection aborted (although it shows 22 in the error,
# but the actual error code is 1)
# Example: ERROR: gcloud crashed (ConnectionError): ('Connection aborted.', OSError(22, 'Invalid argument')) # pylint: disable=line-too-long
retry_returncode=[255, 1],
)
subprocess_utils.handle_returncode(
returncode,
list_reservations_cmd,
error_msg=
f'Failed to get list reservations for {instance_type!r}:\n{stderr}',
stderr=stderr,
stream_logs=True,
)
return [GCPReservation.from_dict(r) for r in json.loads(stdout)]
2 changes: 1 addition & 1 deletion tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _get_az_mappings(_):
lambda _: None)

monkeypatch.setattr(
'sky.clouds.gcp.GCP._list_reservations_for_instance_type',
'sky.clouds.utils.gcp_utils.list_reservations_for_instance_type',
lambda *_args, **_kwargs: [])

# Monkey patch Kubernetes resource detection since it queries
Expand Down
Loading
Loading