-
Notifications
You must be signed in to change notification settings - Fork 541
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AWS] Adopt new provisioner to query clusters #2288
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -11,8 +11,8 @@ | |||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
from sky import clouds | ||||||||||||||||||||||||||||
from sky import exceptions | ||||||||||||||||||||||||||||
from sky import provision as provision_lib | ||||||||||||||||||||||||||||
from sky import sky_logging | ||||||||||||||||||||||||||||
from sky import status_lib | ||||||||||||||||||||||||||||
from sky.adaptors import aws | ||||||||||||||||||||||||||||
from sky.clouds import service_catalog | ||||||||||||||||||||||||||||
from sky.utils import common_utils | ||||||||||||||||||||||||||||
|
@@ -23,6 +23,7 @@ | |||||||||||||||||||||||||||
if typing.TYPE_CHECKING: | ||||||||||||||||||||||||||||
# renaming to avoid shadowing variables | ||||||||||||||||||||||||||||
from sky import resources as resources_lib | ||||||||||||||||||||||||||||
from sky import status_lib | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
logger = sky_logging.init_logger(__name__) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
|
@@ -741,80 +742,30 @@ def check_quota_available(cls, | |||||||||||||||||||||||||||
# Quota found to be greater than zero, try provisioning | ||||||||||||||||||||||||||||
return True | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
@classmethod | ||||||||||||||||||||||||||||
def _query_instance_property_with_retries( | ||||||||||||||||||||||||||||
cls, | ||||||||||||||||||||||||||||
tag_filters: Dict[str, str], | ||||||||||||||||||||||||||||
region: str, | ||||||||||||||||||||||||||||
query: str, | ||||||||||||||||||||||||||||
) -> Tuple[int, str, str]: | ||||||||||||||||||||||||||||
filter_str = ' '.join(f'Name=tag:{key},Values={value}' | ||||||||||||||||||||||||||||
for key, value in tag_filters.items()) | ||||||||||||||||||||||||||||
query_cmd = (f'aws ec2 describe-instances --filters {filter_str} ' | ||||||||||||||||||||||||||||
f'--region {region} --query "{query}" --output json') | ||||||||||||||||||||||||||||
returncode, stdout, stderr = subprocess_utils.run_with_retries( | ||||||||||||||||||||||||||||
query_cmd, | ||||||||||||||||||||||||||||
retry_returncode=[255], | ||||||||||||||||||||||||||||
retry_stderrs=[ | ||||||||||||||||||||||||||||
'Unable to locate credentials. You can configure credentials by ' | ||||||||||||||||||||||||||||
'running "aws configure"' | ||||||||||||||||||||||||||||
]) | ||||||||||||||||||||||||||||
Comment on lines
-758
to
-761
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When reviewing #2314, I realized this code (originally added in #1988) was accidentally left out from this PR/master branch. Could we add it back? #1988 has context. Tldr: previously users have encountered "ec2 describe-instances" throwing NoCredentialsError with this message (the programmatic client may only throw the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, this reminds of another problem. Previously, we retry by issuing another CLI cmd "ec2 describe-instances" with a subprocess. This probably means a new underlying boto3 client is created for each retry. This could be the reason the retry has mitigated this problem. E.g., abandoning a malfunctioning client. With this PR, even if we add retry back, it'll access Lines 64 to 76 in 0bdfc31
This is all speculation since we don't have a reliable repro. That said, could we somehow force create a new boto3 client when we retry? |
||||||||||||||||||||||||||||
return returncode, stdout, stderr | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
@classmethod | ||||||||||||||||||||||||||||
def query_status(cls, name: str, tag_filters: Dict[str, str], | ||||||||||||||||||||||||||||
region: Optional[str], zone: Optional[str], | ||||||||||||||||||||||||||||
**kwargs) -> List['status_lib.ClusterStatus']: | ||||||||||||||||||||||||||||
del zone # unused | ||||||||||||||||||||||||||||
status_map = { | ||||||||||||||||||||||||||||
'pending': status_lib.ClusterStatus.INIT, | ||||||||||||||||||||||||||||
'running': status_lib.ClusterStatus.UP, | ||||||||||||||||||||||||||||
# TODO(zhwu): stopping and shutting-down could occasionally fail | ||||||||||||||||||||||||||||
# due to internal errors of AWS. We should cover that case. | ||||||||||||||||||||||||||||
'stopping': status_lib.ClusterStatus.STOPPED, | ||||||||||||||||||||||||||||
'stopped': status_lib.ClusterStatus.STOPPED, | ||||||||||||||||||||||||||||
'shutting-down': None, | ||||||||||||||||||||||||||||
'terminated': None, | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
assert region is not None, (tag_filters, region) | ||||||||||||||||||||||||||||
returncode, stdout, stderr = cls._query_instance_property_with_retries( | ||||||||||||||||||||||||||||
tag_filters, region, query='Reservations[].Instances[].State.Name') | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
if returncode != 0: | ||||||||||||||||||||||||||||
with ux_utils.print_exception_no_traceback(): | ||||||||||||||||||||||||||||
raise exceptions.ClusterStatusFetchingError( | ||||||||||||||||||||||||||||
f'Failed to query AWS cluster {name!r} status: ' | ||||||||||||||||||||||||||||
f'{stdout + stderr}') | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
original_statuses = json.loads(stdout.strip()) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
statuses = [] | ||||||||||||||||||||||||||||
for s in original_statuses: | ||||||||||||||||||||||||||||
node_status = status_map[s] | ||||||||||||||||||||||||||||
if node_status is not None: | ||||||||||||||||||||||||||||
statuses.append(node_status) | ||||||||||||||||||||||||||||
return statuses | ||||||||||||||||||||||||||||
# TODO(suquark): deprecate this method | ||||||||||||||||||||||||||||
assert False, 'This could path should not be used.' | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
@classmethod | ||||||||||||||||||||||||||||
def create_image_from_cluster(cls, cluster_name: str, | ||||||||||||||||||||||||||||
tag_filters: Dict[str, | ||||||||||||||||||||||||||||
str], region: Optional[str], | ||||||||||||||||||||||||||||
zone: Optional[str]) -> str: | ||||||||||||||||||||||||||||
del zone # unused | ||||||||||||||||||||||||||||
assert region is not None, (tag_filters, region) | ||||||||||||||||||||||||||||
del tag_filters, zone # unused | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
image_name = f'skypilot-{cluster_name}-{int(time.time())}' | ||||||||||||||||||||||||||||
returncode, stdout, stderr = cls._query_instance_property_with_retries( | ||||||||||||||||||||||||||||
tag_filters, region, query='Reservations[].Instances[].InstanceId') | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
subprocess_utils.handle_returncode( | ||||||||||||||||||||||||||||
returncode, | ||||||||||||||||||||||||||||
'', | ||||||||||||||||||||||||||||
error_msg='Failed to find the source cluster on AWS.', | ||||||||||||||||||||||||||||
stderr=stderr, | ||||||||||||||||||||||||||||
stream_logs=False) | ||||||||||||||||||||||||||||
status = provision_lib.query_instances('AWS', cluster_name, | ||||||||||||||||||||||||||||
{'region': region}) | ||||||||||||||||||||||||||||
instance_ids = list(status.keys()) | ||||||||||||||||||||||||||||
if not instance_ids: | ||||||||||||||||||||||||||||
with ux_utils.print_exception_no_traceback(): | ||||||||||||||||||||||||||||
raise RuntimeError('Failed to find the source cluster on AWS.') | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
instance_ids = json.loads(stdout.strip()) | ||||||||||||||||||||||||||||
if len(instance_ids) != 1: | ||||||||||||||||||||||||||||
with ux_utils.print_exception_no_traceback(): | ||||||||||||||||||||||||||||
raise exceptions.NotSupportedError( | ||||||||||||||||||||||||||||
|
@@ -845,7 +796,7 @@ def create_image_from_cluster(cls, cluster_name: str, | |||||||||||||||||||||||||||
wait_image_cmd = ( | ||||||||||||||||||||||||||||
f'aws ec2 wait image-available --region {region} --image-ids {image_id}' | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
returncode, stdout, stderr = subprocess_utils.run_with_retries( | ||||||||||||||||||||||||||||
returncode, _, stderr = subprocess_utils.run_with_retries( | ||||||||||||||||||||||||||||
wait_image_cmd, | ||||||||||||||||||||||||||||
retry_returncode=[255], | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,8 @@ | |
import importlib | ||
import inspect | ||
|
||
from sky import status_lib | ||
suquark marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
def _route_to_cloud_impl(func): | ||
|
||
|
@@ -36,6 +38,23 @@ def _wrapper(*args, **kwargs): | |
# TODO(suquark): Bring all other functions here from the | ||
|
||
|
||
@_route_to_cloud_impl | ||
def query_instances( | ||
provider_name: str, | ||
cluster_name: str, | ||
provider_config: Optional[Dict[str, Any]] = None, | ||
non_terminated_only: bool = True, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For discussion: Does it make sense to not expose this and always assume non_terminated_only=True? Will there be callers who would want this to be False? stop() and terminate() for example already implicitly assume non-terminated, e.g.,
Also similar to node providers' design of get_nonterminated_nodes(). We can certainly leave this for the future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let me leave a comment about it |
||
) -> Dict[str, Optional[status_lib.ClusterStatus]]: | ||
"""Query instances. | ||
|
||
Returns a dictionary of instance IDs and status. | ||
suquark marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
A None status means the instance is marked as "terminated" | ||
or "terminating". | ||
""" | ||
raise NotImplementedError | ||
|
||
|
||
@_route_to_cloud_impl | ||
def stop_instances( | ||
provider_name: str, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
"""AWS provisioner for SkyPilot.""" | ||
|
||
from sky.provision.aws.instance import stop_instances, terminate_instances | ||
from sky.provision.aws.instance import (query_instances, terminate_instances, | ||
stop_instances) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exceptions can be thrown by provision_lib.query_instances()? Should we document that?
Also, how would the caller of _query_cluster_status_via_cloud_api() handle them?
Does the previous codepath allow throwing any exceptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the previous codepath only checks the returncode of aws cli. so it catches general exceptions. We inherit this behavior here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a
to
Just some code gardening.