diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 2b36d95dde9..81a43cb49bb 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -19,6 +19,7 @@ jobs: - tests/test_cli.py - tests/test_config.py - tests/test_global_user_state.py + - tests/test_jobs.py - tests/test_list_accelerators.py - tests/test_optimizer_dryruns.py - tests/test_optimizer_random_dag.py diff --git a/sky/backends/backend.py b/sky/backends/backend.py index dc9f5f1e025..1dbc4ad00f3 100644 --- a/sky/backends/backend.py +++ b/sky/backends/backend.py @@ -82,12 +82,15 @@ def add_storage_objects(self, task: 'task_lib.Task') -> None: @timeline.event @usage_lib.messages.usage.update_runtime('execute') - def execute(self, handle: _ResourceHandleType, task: 'task_lib.Task', - detach_run: bool) -> None: + def execute(self, + handle: _ResourceHandleType, + task: 'task_lib.Task', + detach_run: bool, + dryrun: bool = False) -> None: usage_lib.record_cluster_name_for_current_operation( handle.get_cluster_name()) usage_lib.messages.usage.update_actual_task(task) - return self._execute(handle, task, detach_run) + return self._execute(handle, task, detach_run, dryrun) @timeline.event def post_execute(self, handle: _ResourceHandleType, down: bool) -> None: @@ -136,8 +139,11 @@ def _setup(self, handle: _ResourceHandleType, task: 'task_lib.Task', detach_setup: bool) -> None: raise NotImplementedError - def _execute(self, handle: _ResourceHandleType, task: 'task_lib.Task', - detach_run: bool) -> None: + def _execute(self, + handle: _ResourceHandleType, + task: 'task_lib.Task', + detach_run: bool, + dryrun: bool = False) -> None: raise NotImplementedError def _post_execute(self, handle: _ResourceHandleType, down: bool) -> None: diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 03eb666836b..7f95144525b 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -2136,6 +2136,7 @@ def check_cluster_available( *, operation: str, check_cloud_vm_ray_backend: Literal[True] = True, + dryrun: bool = ..., ) -> 'cloud_vm_ray_backend.CloudVmRayResourceHandle': ... @@ -2146,6 +2147,7 @@ def check_cluster_available( *, operation: str, check_cloud_vm_ray_backend: Literal[False], + dryrun: bool = ..., ) -> backends.ResourceHandle: ... @@ -2155,6 +2157,7 @@ def check_cluster_available( *, operation: str, check_cloud_vm_ray_backend: bool = True, + dryrun: bool = False, ) -> backends.ResourceHandle: """Check if the cluster is available. @@ -2168,6 +2171,10 @@ def check_cluster_available( exceptions.CloudUserIdentityError: if we fail to get the current user identity. """ + if dryrun: + record = global_user_state.get_cluster_from_name(cluster_name) + assert record is not None, cluster_name + return record['handle'] try: cluster_status, handle = refresh_cluster_status_handle(cluster_name) except exceptions.ClusterStatusFetchingError as e: diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 6f98a19c1e4..e8019748e60 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -1440,7 +1440,7 @@ def _retry_zones( f'Failed to find catalog in region {region.name}: {e}') continue if dryrun: - return + return config_dict cluster_config_file = config_dict['ray'] # Record early, so if anything goes wrong, 'sky status' will show @@ -2414,10 +2414,9 @@ def _provision( to_provision, task.num_nodes, prev_cluster_status=None) - if not dryrun: # dry run doesn't need to check existing cluster. - # Try to launch the exiting cluster first - to_provision_config = self._check_existing_cluster( - task, to_provision, cluster_name) + # Try to launch the exiting cluster first + to_provision_config = self._check_existing_cluster( + task, to_provision, cluster_name, dryrun) assert to_provision_config.resources is not None, ( 'to_provision should not be None', to_provision_config) @@ -2495,7 +2494,8 @@ def _provision( error_message, failover_history=e.failover_history) from None if dryrun: - return None + record = global_user_state.get_cluster_from_name(cluster_name) + return record['handle'] if record is not None else None cluster_config_file = config_dict['ray'] handle = CloudVmRayResourceHandle( @@ -3008,6 +3008,7 @@ def _execute( handle: CloudVmRayResourceHandle, task: task_lib.Task, detach_run: bool, + dryrun: bool = False, ) -> None: if task.run is None: logger.info('Run commands not specified or empty.') @@ -3017,6 +3018,11 @@ def _execute( self.check_resources_fit_cluster(handle, task) resources_str = backend_utils.get_task_resources_str(task) + + if dryrun: + logger.info(f'Dryrun complete. Would have run:\n{task}') + return + job_id = self._add_job(handle, task.name, resources_str) is_tpu_vm_pod = tpu_utils.is_tpu_vm_pod(handle.launched_resources) @@ -3767,9 +3773,11 @@ def run_on_head( @timeline.event def _check_existing_cluster( - self, task: task_lib.Task, + self, + task: task_lib.Task, to_provision: Optional[resources_lib.Resources], - cluster_name: str) -> RetryingVmProvisioner.ToProvisionConfig: + cluster_name: str, + dryrun: bool = False) -> RetryingVmProvisioner.ToProvisionConfig: """Checks if the cluster exists and returns the provision config. Raises: @@ -3782,25 +3790,30 @@ def _check_existing_cluster( handle_before_refresh = None if record is None else record['handle'] status_before_refresh = None if record is None else record['status'] - prev_cluster_status, handle = ( - backend_utils.refresh_cluster_status_handle( - cluster_name, - # We force refresh for the init status to determine the actual - # state of a previous cluster in INIT state. - # - # This is important for the case, where an existing cluster is - # transitioned into INIT state due to key interruption during - # launching, with the following steps: - # (1) launch, after answering prompt immediately ctrl-c; - # (2) launch again. - # If we don't refresh the state of the cluster and reset it back - # to STOPPED, our failover logic will consider it as an abnormal - # cluster after hitting resources capacity limit on the cloud, - # and will start failover. This is not desired, because the user - # may want to keep the data on the disk of that cluster. - force_refresh_statuses={status_lib.ClusterStatus.INIT}, - acquire_per_cluster_status_lock=False, - )) + prev_cluster_status, handle = (status_before_refresh, + handle_before_refresh) + + if not dryrun: + prev_cluster_status, handle = ( + backend_utils.refresh_cluster_status_handle( + cluster_name, + # We force refresh for the init status to determine the + # actual state of a previous cluster in INIT state. + # + # This is important for the case, where an existing cluster + # is transitioned into INIT state due to key interruption + # during launching, with the following steps: + # (1) launch, after answering prompt immediately ctrl-c; + # (2) launch again. + # If we don't refresh the state of the cluster and reset it + # back to STOPPED, our failover logic will consider it as an + # abnormal cluster after hitting resources capacity limit on + # the cloud, and will start failover. This is not desired, + # because the user may want to keep the data on the disk of + # that cluster. + force_refresh_statuses={status_lib.ClusterStatus.INIT}, + acquire_per_cluster_status_lock=False, + )) if prev_cluster_status is not None: assert handle is not None # Cluster already exists. diff --git a/sky/backends/local_docker_backend.py b/sky/backends/local_docker_backend.py index 1293e538a47..9b2f2651705 100644 --- a/sky/backends/local_docker_backend.py +++ b/sky/backends/local_docker_backend.py @@ -265,8 +265,11 @@ def _setup(self, handle: LocalDockerResourceHandle, task: 'task_lib.Task', requested_resources=task.resources, ready=True) - def _execute(self, handle: LocalDockerResourceHandle, task: 'task_lib.Task', - detach_run: bool) -> None: + def _execute(self, + handle: LocalDockerResourceHandle, + task: 'task_lib.Task', + detach_run: bool, + dryrun: bool = False) -> None: """ Launches the container.""" if detach_run: @@ -283,6 +286,10 @@ def _execute(self, handle: LocalDockerResourceHandle, task: 'task_lib.Task', logger.info(f'Nothing to run; run command not specified:\n{task}') return + if dryrun: + logger.info(f'Dryrun complete. Would have run:\n{task}') + return + self._execute_task_one_node(handle, task) def _post_execute(self, handle: LocalDockerResourceHandle, diff --git a/sky/clouds/aws.py b/sky/clouds/aws.py index 43d46a2da4a..909978e855b 100644 --- a/sky/clouds/aws.py +++ b/sky/clouds/aws.py @@ -360,8 +360,8 @@ def make_deploy_resources_variables( **AWS._get_disk_specs(r.disk_tier) } - def get_feasible_launchable_resources(self, - resources: 'resources_lib.Resources'): + def _get_feasible_launchable_resources( + self, resources: 'resources_lib.Resources'): if resources.instance_type is not None: assert resources.is_launchable(), resources # Treat Resources(AWS, p3.2x, V100) as Resources(AWS, p3.2x). diff --git a/sky/clouds/azure.py b/sky/clouds/azure.py index bcd2c2847dc..c6451f152e7 100644 --- a/sky/clouds/azure.py +++ b/sky/clouds/azure.py @@ -248,7 +248,7 @@ def make_deploy_resources_variables( 'disk_tier': Azure._get_disk_type(r.disk_tier) } - def get_feasible_launchable_resources(self, resources): + def _get_feasible_launchable_resources(self, resources): def failover_disk_tier( instance_type: str, @@ -288,11 +288,15 @@ def failover_disk_tier( return ([], []) if resources.instance_type is not None: assert resources.is_launchable(), resources - # Treat Resources(AWS, p3.2x, V100) as Resources(AWS, p3.2x). + ok, disk_tier = failover_disk_tier(resources.instance_type, + resources.disk_tier) + if not ok: + return ([], []) + # Treat Resources(Azure, Standard_NC4as_T4_v3, T4) as + # Resources(Azure, Standard_NC4as_T4_v3). resources = resources.copy( accelerators=None, - disk_tier=failover_disk_tier(resources.instance_type, - resources.disk_tier), + disk_tier=disk_tier, ) return ([resources], []) diff --git a/sky/clouds/cloud.py b/sky/clouds/cloud.py index b4361d95b3e..b963bd8240d 100644 --- a/sky/clouds/cloud.py +++ b/sky/clouds/cloud.py @@ -12,7 +12,7 @@ if typing.TYPE_CHECKING: from sky import status_lib - from sky import resources + from sky import resources as resources_lib class CloudImplementationFeatures(enum.Enum): @@ -219,7 +219,7 @@ def is_same_cloud(self, other): def make_deploy_resources_variables( self, - resources: 'resources.Resources', + resources: 'resources_lib.Resources', region: 'Region', zones: Optional[List['Zone']], ) -> Dict[str, Optional[str]]: @@ -294,6 +294,11 @@ def get_feasible_launchable_resources(self, resources): Launchable resources require a cloud and an instance type be assigned. """ + if resources.is_launchable(): + self._check_instance_type_accelerators_combination(resources) + return self._get_feasible_launchable_resources(resources) + + def _get_feasible_launchable_resources(self, resources): raise NotImplementedError @classmethod @@ -395,8 +400,8 @@ def accelerator_in_region_or_zone(self, """Returns whether the accelerator is valid in the region or zone.""" raise NotImplementedError - def need_cleanup_after_preemption(self, - resource: 'resources.Resources') -> bool: + def need_cleanup_after_preemption( + self, resource: 'resources_lib.Resources') -> bool: """Returns whether a spot resource needs cleanup after preeemption. In most cases, spot resources do not need cleanup after preemption, @@ -478,7 +483,54 @@ def check_disk_tier_enabled(cls, instance_type: str, raise NotImplementedError @classmethod - # pylint: disable=unused-argument + def _check_instance_type_accelerators_combination( + cls, resources: 'resources_lib.Resources') -> None: + """Errors out if the accelerator is not supported by the instance type. + + This function is overridden by GCP for host-accelerator logic. + + Raises: + ResourcesMismatchError: If the accelerator is not supported. + """ + assert resources.is_launchable(), resources + + def _equal_accelerators( + acc_requested: Optional[Dict[str, int]], + acc_from_instance_type: Optional[Dict[str, int]]) -> bool: + """Check the requested accelerators equals to the instance type + + Check the requested accelerators equals to the accelerators + from the instance type (both the accelerator type and the + count). + """ + if acc_requested is None: + return acc_from_instance_type is None + if acc_from_instance_type is None: + return False + + for acc in acc_requested: + if acc not in acc_from_instance_type: + return False + if acc_requested[acc] != acc_from_instance_type[acc]: + return False + return True + + acc_from_instance_type = (cls.get_accelerators_from_instance_type( + resources.instance_type)) + if not _equal_accelerators(resources.accelerators, + acc_from_instance_type): + with ux_utils.print_exception_no_traceback(): + raise exceptions.ResourcesMismatchError( + 'Infeasible resource demands found:' + '\n Instance type requested: ' + f'{resources.instance_type}\n' + f' Accelerators for {resources.instance_type}: ' + f'{acc_from_instance_type}\n' + f' Accelerators requested: {resources.accelerators}\n' + f'To fix: either only specify instance_type, or ' + 'change the accelerators field to be consistent.') + + @classmethod def check_quota_available(cls, region: str, instance_type: str, @@ -526,6 +578,7 @@ def check_quota_available(cls, Returns: False if the quota is found to be zero, and true otherwise. """ + del region, instance_type, use_spot # unused return True diff --git a/sky/clouds/gcp.py b/sky/clouds/gcp.py index 2ba220fc0ca..e68c9f06bdc 100644 --- a/sky/clouds/gcp.py +++ b/sky/clouds/gcp.py @@ -394,7 +394,7 @@ def make_deploy_resources_variables( return resources_vars - def get_feasible_launchable_resources(self, resources): + def _get_feasible_launchable_resources(self, resources): if resources.instance_type is not None: assert resources.is_launchable(), resources return ([resources], []) @@ -766,18 +766,12 @@ def get_project_id(cls, dryrun: bool = False) -> str: return project_id @staticmethod - def check_host_accelerator_compatibility( - instance_type: str, accelerators: Optional[Dict[str, int]]) -> None: - service_catalog.check_host_accelerator_compatibility( - instance_type, accelerators, 'gcp') - - @staticmethod - def check_accelerator_attachable_to_host( - instance_type: str, - accelerators: Optional[Dict[str, int]], - zone: Optional[str] = None) -> None: + def _check_instance_type_accelerators_combination( + resources: 'resources.Resources') -> None: + assert resources.is_launchable(), resources service_catalog.check_accelerator_attachable_to_host( - instance_type, accelerators, zone, 'gcp') + resources.instance_type, resources.accelerators, resources.zone, + 'gcp') @classmethod def check_disk_tier_enabled(cls, instance_type: str, diff --git a/sky/clouds/ibm.py b/sky/clouds/ibm.py index 90b709c0589..53887aa1805 100644 --- a/sky/clouds/ibm.py +++ b/sky/clouds/ibm.py @@ -246,8 +246,8 @@ def get_default_instance_type( disk_tier=disk_tier, clouds='ibm') - def get_feasible_launchable_resources(self, - resources: 'resources_lib.Resources'): + def _get_feasible_launchable_resources( + self, resources: 'resources_lib.Resources'): """Returns a list of feasible and launchable resources. Feasible resources refer to an offering respecting the resource diff --git a/sky/clouds/lambda_cloud.py b/sky/clouds/lambda_cloud.py index 5f77673748b..7ea92601e40 100644 --- a/sky/clouds/lambda_cloud.py +++ b/sky/clouds/lambda_cloud.py @@ -163,8 +163,8 @@ def make_deploy_resources_variables( 'region': region.name, } - def get_feasible_launchable_resources(self, - resources: 'resources_lib.Resources'): + def _get_feasible_launchable_resources( + self, resources: 'resources_lib.Resources'): if resources.use_spot or resources.disk_tier is not None: return ([], []) if resources.instance_type is not None: diff --git a/sky/clouds/local.py b/sky/clouds/local.py index f7881933b75..64d561067d1 100644 --- a/sky/clouds/local.py +++ b/sky/clouds/local.py @@ -134,8 +134,8 @@ def make_deploy_resources_variables( zones: Optional[List['clouds.Zone']]) -> Dict[str, Optional[str]]: return {} - def get_feasible_launchable_resources(self, - resources: 'resources_lib.Resources'): + def _get_feasible_launchable_resources( + self, resources: 'resources_lib.Resources'): if resources.disk_tier is not None: return ([], []) # The entire local cluster's resources is considered launchable, as the diff --git a/sky/clouds/oci.py b/sky/clouds/oci.py index 2fe28d21825..34f2560754b 100644 --- a/sky/clouds/oci.py +++ b/sky/clouds/oci.py @@ -275,8 +275,8 @@ def make_deploy_resources_variables( 'use_spot': resources.use_spot } - def get_feasible_launchable_resources(self, - resources: 'resources_lib.Resources'): + def _get_feasible_launchable_resources( + self, resources: 'resources_lib.Resources'): if resources.instance_type is not None: assert resources.is_launchable(), resources resources = resources.copy(accelerators=None) diff --git a/sky/clouds/scp.py b/sky/clouds/scp.py index 263b392a454..2a19e33350e 100644 --- a/sky/clouds/scp.py +++ b/sky/clouds/scp.py @@ -228,10 +228,14 @@ def _get_default_ami(cls, region_name: str, instance_type: str) -> str: 'No image found in catalog for region ' f'{region_name}. Try setting a valid image_id.') - def get_feasible_launchable_resources(self, - resources: 'resources_lib.Resources'): + def _get_feasible_launchable_resources( + self, resources: 'resources_lib.Resources'): if resources.use_spot or resources.disk_tier is not None: return ([], []) + # Check if the host VM satisfies the min/max disk size limits. + is_allowed = self._is_disk_size_allowed(resources) + if not is_allowed: + return ([], []) if resources.instance_type is not None: assert resources.is_launchable(), resources # Accelerators are part of the instance type in SCP Cloud @@ -326,7 +330,7 @@ def accelerator_in_region_or_zone(self, accelerator, acc_count, region, zone, 'scp') @staticmethod - def is_disk_size_allowed(resources): + def _is_disk_size_allowed(resources): if (resources.disk_size and (resources.disk_size < _SCP_MIN_DISK_SIZE_GB or resources.disk_size > _SCP_MAX_DISK_SIZE_GB)): @@ -334,8 +338,8 @@ def is_disk_size_allowed(resources): f' {_SCP_MIN_DISK_SIZE_GB} GB ' f'and {_SCP_MAX_DISK_SIZE_GB} GB. ' f'Input: {resources.disk_size}') - return False, [] - return True, [resources] + return False + return True @classmethod def query_status(cls, name: str, tag_filters: Dict[str, str], diff --git a/sky/clouds/service_catalog/__init__.py b/sky/clouds/service_catalog/__init__.py index 3116831b8ba..3a1150f4ed6 100644 --- a/sky/clouds/service_catalog/__init__.py +++ b/sky/clouds/service_catalog/__init__.py @@ -260,22 +260,6 @@ def get_region_zones_for_accelerators( acc_name, acc_count, use_spot) -def check_host_accelerator_compatibility(instance_type: str, - accelerators: Optional[Dict[str, int]], - clouds: CloudFilter = None) -> None: - """GCP only: Check if host VM type is compatible with the accelerators. - - This function is invoked whenever a Resources object is created. - This function ensures that TPUs and GPUs (except A100) are attached to N1, - and A100 GPUs are attached to A2 machines. However, it does NOT check - the maximum vCPU count and maximum memory limits for the accelerators - because any Resources like GCP(n1-highmem-64, {'V100': 0.01}) can be valid - for sky exec/launch on an existing cluster. - """ - _map_clouds_catalog(clouds, 'check_host_accelerator_compatibility', - instance_type, accelerators) - - def check_accelerator_attachable_to_host(instance_type: str, accelerators: Optional[Dict[str, int]], zone: Optional[str] = None, diff --git a/sky/clouds/service_catalog/aws_catalog.py b/sky/clouds/service_catalog/aws_catalog.py index ef6102d5cbb..fb0b39c3bea 100644 --- a/sky/clouds/service_catalog/aws_catalog.py +++ b/sky/clouds/service_catalog/aws_catalog.py @@ -68,6 +68,24 @@ pull_frequency_hours=_PULL_FREQUENCY_HOURS) +def _get_az_mappings(aws_user_hash: str) -> Optional[pd.DataFrame]: + az_mapping_path = common.get_catalog_path( + f'aws/az_mappings-{aws_user_hash}.csv') + if not os.path.exists(az_mapping_path): + az_mappings = None + if aws_user_hash != 'default': + # Fetch az mapping from AWS. + logger.info(f'{colorama.Style.DIM}Fetching availability zones ' + f'mapping for AWS...{colorama.Style.RESET_ALL}') + az_mappings = fetch_aws.fetch_availability_zone_mappings() + else: + return None + az_mappings.to_csv(az_mapping_path, index=False) + else: + az_mappings = pd.read_csv(az_mapping_path) + return az_mappings + + def _fetch_and_apply_az_mapping(df: pd.DataFrame) -> pd.DataFrame: """Maps zone IDs (use1-az1) to zone names (us-east-1x). @@ -114,23 +132,12 @@ def _fetch_and_apply_az_mapping(df: pd.DataFrame) -> pd.DataFrame: 'Failed to get AWS user identity. Using the latest mapping ' f'file for user {aws_user_hash!r}.') - az_mapping_path = common.get_catalog_path( - f'aws/az_mappings-{aws_user_hash}.csv') - if not os.path.exists(az_mapping_path): - az_mappings = None - if aws_user_hash != 'default': - # Fetch az mapping from AWS. - logger.info(f'{colorama.Style.DIM}Fetching availability zones ' - f'mapping for AWS...{colorama.Style.RESET_ALL}') - az_mappings = fetch_aws.fetch_availability_zone_mappings() - else: - # Returning the original dataframe directly, as no cloud - # identity can be fetched which suggests there are no - # credentials. - return df - az_mappings.to_csv(az_mapping_path, index=False) - else: - az_mappings = pd.read_csv(az_mapping_path) + az_mappings = _get_az_mappings(aws_user_hash) + if az_mappings is None: + # Returning the original dataframe directly, as no cloud + # identity can be fetched which suggests there are no + # credentials. + return df # Use inner join to drop rows with unknown AZ IDs, which are likely # because the user does not have access to that Region. Otherwise, # there will be rows with NaN in the AvailabilityZone column. diff --git a/sky/clouds/service_catalog/common.py b/sky/clouds/service_catalog/common.py index e55f5b33506..040a640a2c5 100644 --- a/sky/clouds/service_catalog/common.py +++ b/sky/clouds/service_catalog/common.py @@ -189,7 +189,7 @@ def _get_all_supported_regions_str() -> str: filter_df = df if region is not None: - filter_df = filter_df[filter_df['Region'].str.lower() == region.lower()] + filter_df = _filter_region_zone(filter_df, region, zone=None) if len(filter_df) == 0: with ux_utils.print_exception_no_traceback(): error_msg = (f'Invalid region {region!r}') @@ -335,6 +335,15 @@ def _filter_with_mem(df: pd.DataFrame, return df[df['MemoryGiB'] == memory] +def _filter_region_zone(df: pd.DataFrame, region: Optional[str], + zone: Optional[str]) -> pd.DataFrame: + if region is not None: + df = df[df['Region'].str.lower() == region.lower()] + if zone is not None: + df = df[df['AvailabilityZone'].str.lower() == zone.lower()] + return df + + def get_instance_type_for_cpus_mem_impl( df: pd.DataFrame, cpus: Optional[str], memory_gb_or_ratio: Optional[str]) -> Optional[str]: @@ -391,10 +400,12 @@ def get_instance_type_for_accelerator_impl( """ result = df[(df['AcceleratorName'].str.fullmatch(acc_name, case=False)) & (df['AcceleratorCount'] == acc_count)] + result = _filter_region_zone(result, region, zone) if len(result) == 0: fuzzy_result = df[ (df['AcceleratorName'].str.contains(acc_name, case=False)) & (df['AcceleratorCount'] >= acc_count)] + fuzzy_result = _filter_region_zone(fuzzy_result, region, zone) fuzzy_result = fuzzy_result.sort_values('Price', ascending=True) fuzzy_result = fuzzy_result[['AcceleratorName', 'AcceleratorCount']].drop_duplicates() @@ -407,11 +418,7 @@ def get_instance_type_for_accelerator_impl( result = _filter_with_cpus(result, cpus) result = _filter_with_mem(result, memory) - if region is not None: - result = result[result['Region'].str.lower() == region] - if zone is not None: - # NOTE: For Azure regions, zone must be None. - result = result[result['AvailabilityZone'] == zone] + result = _filter_region_zone(result, region, zone) if len(result) == 0: return ([], []) @@ -569,8 +576,7 @@ def get_image_id_from_tag_impl(df: pd.DataFrame, tag: str, an image that matches the tag. """ df = df[df['Tag'] == tag] - if region is not None: - df = df[df['Region'].str.lower() == region.lower()] + df = _filter_region_zone(df, region, zone=None) assert len(df) <= 1, ('Multiple images found for tag ' f'{tag} in region {region}') if len(df) == 0: @@ -585,7 +591,6 @@ def is_image_tag_valid_impl(df: pd.DataFrame, tag: str, region: Optional[str]) -> bool: """Returns True if the image tag is valid.""" df = df[df['Tag'] == tag] - if region is not None: - df = df[df['Region'].str.lower() == region.lower()] + df = _filter_region_zone(df, region, zone=None) df = df.dropna(subset=['ImageId']) return len(df) > 0 diff --git a/sky/clouds/service_catalog/gcp_catalog.py b/sky/clouds/service_catalog/gcp_catalog.py index 3558e8b7f0a..e3f32a8ceef 100644 --- a/sky/clouds/service_catalog/gcp_catalog.py +++ b/sky/clouds/service_catalog/gcp_catalog.py @@ -398,12 +398,17 @@ def get_region_zones_for_accelerators( return common.get_region_zones(df, use_spot) -def check_host_accelerator_compatibility( - instance_type: str, accelerators: Optional[Dict[str, int]]) -> None: - """Check if the instance type is compatible with the accelerators. +def check_accelerator_attachable_to_host(instance_type: str, + accelerators: Optional[Dict[str, int]], + zone: Optional[str] = None) -> None: + """Check if the accelerators can be attached to the host. + + This function checks the max CPU count and memory of the host that + the accelerators can be attached to. - This function ensures that TPUs and GPUs except A100 are attached to N1, - and A100 GPUs are attached to A2 machines. + Raises: + exceptions.ResourcesMismatchError: If the accelerators cannot be + attached to the host. """ if accelerators is None: if instance_type.startswith('a2-'): @@ -418,12 +423,12 @@ def check_host_accelerator_compatibility( acc = list(accelerators.items()) assert len(acc) == 1, acc - acc_name, _ = acc[0] + acc_name, acc_count = acc[0] # Check if the accelerator is supported by GCP. if not list_accelerators(gpus_only=False, name_filter=acc_name): with ux_utils.print_exception_no_traceback(): - raise exceptions.ResourcesUnavailableError( + raise exceptions.ResourcesMismatchError( f'{acc_name} is not available in GCP. ' 'See \'sky show-gpus --cloud gcp\'') @@ -438,45 +443,22 @@ def check_host_accelerator_compatibility( # Treat A100 as a special case. if acc_name in _A100_INSTANCE_TYPE_DICTS: - # A100 must be attached to A2 instance type. - if not instance_type.startswith('a2-'): + a100_instance_type = _A100_INSTANCE_TYPE_DICTS[acc_name][acc_count] + if instance_type != a100_instance_type: with ux_utils.print_exception_no_traceback(): raise exceptions.ResourcesMismatchError( - f'A100 GPUs cannot be attached to {instance_type}. ' - f'Use A2 machines instead. Please refer to ' + f'A100:{acc_count} cannot be attached to {instance_type}. ' + f'Use {a100_instance_type} instead. Please refer to ' 'https://cloud.google.com/compute/docs/gpus#a100-gpus') - return - - # Other GPUs must be attached to N1 machines. - # Refer to: https://cloud.google.com/compute/docs/machine-types#gpus - if not instance_type.startswith('n1-'): + elif not instance_type.startswith('n1-'): + # Other GPUs must be attached to N1 machines. + # Refer to: https://cloud.google.com/compute/docs/machine-types#gpus with ux_utils.print_exception_no_traceback(): raise exceptions.ResourcesMismatchError( f'{acc_name} GPUs cannot be attached to {instance_type}. ' 'Use N1 instance types instead. Please refer to: ' 'https://cloud.google.com/compute/docs/machine-types#gpus') - -def check_accelerator_attachable_to_host(instance_type: str, - accelerators: Optional[Dict[str, int]], - zone: Optional[str] = None) -> None: - """Check if the accelerators can be attached to the host. - - This function checks the max CPU count and memory of the host that - the accelerators can be attached to. - """ - if accelerators is None: - return - - acc = list(accelerators.items()) - assert len(acc) == 1, acc - acc_name, acc_count = acc[0] - - if acc_name.startswith('tpu-'): - # TODO(woosuk): Check max vCPUs and memory for each TPU type. - assert instance_type == 'TPU-VM' or instance_type.startswith('n1-') - return - if acc_name in _A100_INSTANCE_TYPE_DICTS: valid_counts = list(_A100_INSTANCE_TYPE_DICTS[acc_name].keys()) else: @@ -488,25 +470,20 @@ def check_accelerator_attachable_to_host(instance_type: str, f'{acc_name}:{acc_count} is not launchable on GCP. ' f'The valid {acc_name} counts are {valid_counts}.') - if acc_name in _A100_INSTANCE_TYPE_DICTS: - a100_instance_type = _A100_INSTANCE_TYPE_DICTS[acc_name][acc_count] - if instance_type != a100_instance_type: - with ux_utils.print_exception_no_traceback(): - raise exceptions.ResourcesMismatchError( - f'A100:{acc_count} cannot be attached to {instance_type}. ' - f'Use {a100_instance_type} instead. Please refer to ' - 'https://cloud.google.com/compute/docs/gpus#a100-gpus') - return - # Check maximum vCPUs and memory. - max_cpus, max_memory = _NUM_ACC_TO_MAX_CPU_AND_MEMORY[acc_name][acc_count] - if acc_name == 'K80' and acc_count == 8: - if zone in ['asia-east1-a', 'us-east1-d']: - max_memory = 416 - elif acc_name == 'P100' and acc_count == 4: - if zone in ['us-east1-c', 'europe-west1-d', 'europe-west1-b']: - max_cpus = 64 - max_memory = 208 + if acc_name in _A100_INSTANCE_TYPE_DICTS: + max_cpus, max_memory = get_vcpus_mem_from_instance_type( + a100_instance_type) + else: + max_cpus, max_memory = _NUM_ACC_TO_MAX_CPU_AND_MEMORY[acc_name][ + acc_count] + if acc_name == 'K80' and acc_count == 8: + if zone in ['asia-east1-a', 'us-east1-d']: + max_memory = 416 + elif acc_name == 'P100' and acc_count == 4: + if zone in ['us-east1-c', 'europe-west1-d', 'europe-west1-b']: + max_cpus = 64 + max_memory = 208 # vCPU counts and memory sizes of N1 machines. df = _df[_df['InstanceType'] == instance_type] diff --git a/sky/execution.py b/sky/execution.py index 810a4980a20..4abaf886ac6 100644 --- a/sky/execution.py +++ b/sky/execution.py @@ -328,24 +328,24 @@ def _execute( cluster_name=cluster_name, retry_until_up=retry_until_up) - if dryrun: - logger.info('Dry run finished.') + if dryrun and handle is None: + logger.info('Dryrun finished.') return - if Stage.SYNC_WORKDIR in stages: + if Stage.SYNC_WORKDIR in stages and not dryrun: if task.workdir is not None: backend.sync_workdir(handle, task.workdir) - if Stage.SYNC_FILE_MOUNTS in stages: + if Stage.SYNC_FILE_MOUNTS in stages and not dryrun: backend.sync_file_mounts(handle, task.file_mounts, task.storage_mounts) if no_setup: logger.info('Setup commands skipped.') - elif Stage.SETUP in stages: + elif Stage.SETUP in stages and not dryrun: backend.setup(handle, task, detach_setup=detach_setup) - if Stage.PRE_EXEC in stages: + if Stage.PRE_EXEC in stages and not dryrun: if idle_minutes_to_autostop is not None: assert isinstance(backend, backends.CloudVmRayBackend) backend.set_autostop(handle, @@ -355,12 +355,12 @@ def _execute( if Stage.EXEC in stages: try: global_user_state.update_last_use(handle.get_cluster_name()) - backend.execute(handle, task, detach_run) + backend.execute(handle, task, detach_run, dryrun=dryrun) finally: # Enables post_execute() to be run after KeyboardInterrupt. backend.post_execute(handle, down) - if Stage.DOWN in stages: + if Stage.DOWN in stages and not dryrun: if down and idle_minutes_to_autostop is None: backend.teardown_ephemeral_storage(task) backend.teardown(handle, terminate=True) @@ -570,7 +570,8 @@ def exec( # pylint: disable=redefined-builtin handle = backend_utils.check_cluster_available( cluster_name, operation='executing tasks', - check_cloud_vm_ray_backend=False) + check_cloud_vm_ray_backend=False, + dryrun=dryrun) _execute(entrypoint=entrypoint, dryrun=dryrun, down=down, diff --git a/sky/optimizer.py b/sky/optimizer.py index 33b570121b2..185708ac71a 100644 --- a/sky/optimizer.py +++ b/sky/optimizer.py @@ -273,7 +273,7 @@ def _estimate_nodes_cost_or_time( error_msg = ( 'No launchable resource found for task ' f'{node}.{location_hint}\nThis means the ' - 'catalog does not contain any instance types that ' + 'catalog does not contain any resources that ' 'satisfy this request.\n' 'To fix: relax or change the resource requirements.\n' 'Hint: \'sky show-gpus --all\' ' @@ -954,30 +954,6 @@ def _fill_in_launchable_resources( f'{colorama.Style.BRIGHT}' f'sky check {colorama.Style.RESET_ALL}, or change the ' 'cloud requirement') - elif resources.is_launchable(): - if isinstance(resources.cloud, clouds.GCP): - # Check if the host VM satisfies the max vCPU and memory limits. - clouds.GCP.check_accelerator_attachable_to_host( - resources.instance_type, resources.accelerators, - resources.zone) - # If the user has specified a GCP zone and the zone does not support - # the host-accelerator combination, then an error will be raised by - # the above check_accelerator_attachable_to_host() call. - # If the user has not specified any zone, a launchable will be made - # for every zone even if some of the zones do not support the - # host-accelerator combination. Then the provisioner may try to - # launch the instance, and fail over to other zones. We find this - # behavior acceptable because this will happen only when the user - # requested GCP 4:P100 or 8:K80 with a very large host VM. - elif isinstance(resources.cloud, clouds.SCP): - # Check if the host VM satisfies the min/max disk size limits. - is_allowed, launchable[resources] = \ - clouds.SCP.is_disk_size_allowed(resources) - if not is_allowed: - continue - - launchable[resources] = _make_launchables_for_valid_region_zones( - resources) else: clouds_list = ([resources.cloud] if resources.cloud is not None else enabled_clouds) diff --git a/sky/resources.py b/sky/resources.py index f306c96d7ce..907789d9cef 100644 --- a/sky/resources.py +++ b/sky/resources.py @@ -162,7 +162,6 @@ def __init__( self._try_validate_local() self._try_validate_instance_type() self._try_validate_cpus_mem() - self._try_validate_accelerators() self._try_validate_spot() self._try_validate_image_id() self._try_validate_disk_tier() @@ -505,7 +504,8 @@ def get_valid_regions_for_launchable(self) -> List[clouds.Region]: may have restricted the regions to be considered (e.g., a ssh_proxy_command dict with region names as keys). """ - assert self.is_launchable() + assert self.is_launchable(), self + regions = self._cloud.regions_with_offering(self._instance_type, self.accelerators, self._use_spot, @@ -624,56 +624,6 @@ def _try_validate_cpus_mem(self) -> None: f'memory. {self.instance_type} has {mem} GB ' f'memory, but {self.memory} is requested.') - def _try_validate_accelerators(self) -> None: - """Validate accelerators against the instance type and region/zone.""" - acc_requested = self.accelerators - if (isinstance(self.cloud, clouds.GCP) and - self.instance_type is not None): - # Do this check even if acc_requested is None. - clouds.GCP.check_host_accelerator_compatibility( - self.instance_type, acc_requested) - - if acc_requested is None: - return - - if self.is_launchable() and not isinstance(self.cloud, clouds.GCP): - # GCP attaches accelerators to VMs, so no need for this check. - acc_requested = self.accelerators - acc_from_instance_type = ( - self.cloud.get_accelerators_from_instance_type( - self._instance_type)) - if not Resources(accelerators=acc_requested).less_demanding_than( - Resources(accelerators=acc_from_instance_type)): - with ux_utils.print_exception_no_traceback(): - raise ValueError( - 'Infeasible resource demands found:\n' - f' Instance type requested: {self._instance_type}\n' - f' Accelerators for {self._instance_type}: ' - f'{acc_from_instance_type}\n' - f' Accelerators requested: {acc_requested}\n' - f'To fix: either only specify instance_type, or change ' - 'the accelerators field to be consistent.') - # NOTE: should not clear 'self.accelerators' even for AWS/Azure, - # because e.g., the instance may have 4 GPUs, while the task - # specifies to use 1 GPU. - - # Validate whether accelerator is available in specified region/zone. - acc, acc_count = list(acc_requested.items())[0] - # Fractional accelerators are temporarily bumped up to 1. - if 0 < acc_count < 1: - acc_count = 1 - if self.region is not None or self.zone is not None: - if not self._cloud.accelerator_in_region_or_zone( - acc, acc_count, self.region, self.zone): - error_str = (f'Accelerator "{acc}" is not available in ' - '"{}".') - if self.zone: - error_str = error_str.format(self.zone) - else: - error_str = error_str.format(self.region) - with ux_utils.print_exception_no_traceback(): - raise ValueError(error_str) - def _try_validate_spot(self) -> None: if self._spot_recovery is None: return diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index c2b1caa31ba..7cd3958a3e3 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -285,7 +285,8 @@ def _launch(self, _is_launched_by_spot_controller=True) logger.info('Spot cluster launched.') except (exceptions.InvalidClusterNameError, - exceptions.NoCloudAccessError) as e: + exceptions.NoCloudAccessError, + exceptions.ResourcesMismatchError) as e: logger.error('Failure happened before provisioning. ' f'{common_utils.format_exception(e)}') if raise_on_failure: diff --git a/tests/conftest.py b/tests/conftest.py index fe26cfc7be0..7976bf10132 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,6 +2,8 @@ import tempfile from typing import List +import pandas as pd + # Usage: use # @pytest.mark.slow # to mark a test as slow and to skip by default. @@ -194,6 +196,18 @@ def enable_all_clouds(monkeypatch): config_file_backup.name) monkeypatch.setenv('OCI_CONFIG', config_file_backup.name) + az_mappings = pd.read_csv('tests/default_aws_az_mappings.csv') + + def _get_az_mappings(_): + return az_mappings + + monkeypatch.setattr( + 'sky.clouds.service_catalog.aws_catalog._get_az_mappings', + _get_az_mappings) + + monkeypatch.setattr('sky.backends.backend_utils.check_owner_identity', + lambda _: None) + @pytest.fixture def aws_config_region(monkeypatch) -> str: diff --git a/tests/default_aws_az_mappings.csv b/tests/default_aws_az_mappings.csv new file mode 100644 index 00000000000..03b824acee2 --- /dev/null +++ b/tests/default_aws_az_mappings.csv @@ -0,0 +1,71 @@ +AvailabilityZoneName,AvailabilityZone +eu-west-2a,euw2-az2 +eu-west-2b,euw2-az3 +eu-west-2c,euw2-az1 +ap-south-1a,aps1-az1 +ap-south-1b,aps1-az3 +ap-south-1c,aps1-az2 +us-east-1a,use1-az2 +us-east-1b,use1-az4 +us-east-1c,use1-az6 +us-east-1d,use1-az1 +us-east-1e,use1-az3 +us-east-1f,use1-az5 +ap-southeast-3a,apse3-az1 +ap-southeast-3b,apse3-az2 +ap-southeast-3c,apse3-az3 +ca-central-1a,cac1-az1 +ca-central-1b,cac1-az2 +ca-central-1d,cac1-az4 +us-east-2a,use2-az1 +us-east-2b,use2-az2 +us-east-2c,use2-az3 +eu-north-1a,eun1-az1 +eu-north-1b,eun1-az2 +eu-north-1c,eun1-az3 +eu-west-1a,euw1-az3 +eu-west-1b,euw1-az1 +eu-west-1c,euw1-az2 +eu-west-3a,euw3-az1 +eu-west-3b,euw3-az2 +eu-west-3c,euw3-az3 +eu-central-1a,euc1-az2 +eu-central-1b,euc1-az3 +eu-central-1c,euc1-az1 +ap-northeast-2a,apne2-az1 +ap-northeast-2b,apne2-az2 +ap-northeast-2c,apne2-az3 +ap-northeast-2d,apne2-az4 +me-south-1a,mes1-az1 +me-south-1b,mes1-az2 +me-south-1c,mes1-az3 +ap-northeast-3a,apne3-az3 +ap-northeast-3b,apne3-az1 +ap-northeast-3c,apne3-az2 +us-west-2a,usw2-az1 +us-west-2b,usw2-az2 +us-west-2c,usw2-az3 +us-west-2d,usw2-az4 +eu-south-1a,eus1-az1 +eu-south-1b,eus1-az2 +eu-south-1c,eus1-az3 +me-central-1a,mec1-az1 +me-central-1b,mec1-az2 +me-central-1c,mec1-az3 +ap-east-1a,ape1-az1 +ap-east-1b,ape1-az2 +ap-east-1c,ape1-az3 +af-south-1a,afs1-az1 +af-south-1b,afs1-az2 +af-south-1c,afs1-az3 +ap-southeast-1a,apse1-az2 +ap-southeast-1b,apse1-az1 +ap-southeast-1c,apse1-az3 +ap-northeast-1a,apne1-az4 +ap-northeast-1c,apne1-az1 +ap-northeast-1d,apne1-az2 +ap-southeast-2a,apse2-az1 +ap-southeast-2b,apse2-az3 +ap-southeast-2c,apse2-az2 +us-west-1a,usw1-az1 +us-west-1c,usw1-az3 diff --git a/tests/test_cli.py b/tests/test_cli.py index 4a7dcba7a27..4acc29d4e4b 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -4,6 +4,7 @@ from click import testing as cli_testing import sky +from sky import exceptions import sky.cli as cli CLOUDS_TO_TEST = ['aws', 'gcp', 'ibm', 'azure', 'lambda', 'scp', 'oci'] @@ -54,7 +55,7 @@ def test_accelerator_mismatch(enable_all_clouds): def _capture_mismatch_gpus_spec(file_path, gpus: str): result = cli_runner.invoke(cli.launch, [file_path, '--gpus', gpus, '--dryrun']) - assert isinstance(result.exception, ValueError) + assert isinstance(result.exception, exceptions.ResourcesMismatchError) assert 'Infeasible resource demands found:' in str(result.exception) def _capture_match_gpus_spec(file_path, gpus: str): @@ -70,10 +71,10 @@ def _capture_match_gpus_spec(file_path, gpus: str): _capture_mismatch_gpus_spec(f.name, 'T4:0.5') _capture_mismatch_gpus_spec(f.name, 'V100:2') _capture_mismatch_gpus_spec(f.name, 'v100:2') + _capture_mismatch_gpus_spec(f.name, 'V100:0.5') _capture_match_gpus_spec(f.name, 'V100:1') _capture_match_gpus_spec(f.name, 'v100:1') - _capture_match_gpus_spec(f.name, 'V100:0.5') _capture_match_gpus_spec(f.name, 'V100') diff --git a/tests/test_jobs.py b/tests/test_jobs.py new file mode 100644 index 00000000000..d61a851c486 --- /dev/null +++ b/tests/test_jobs.py @@ -0,0 +1,136 @@ +import pytest + +import sky +from sky import backends +from sky import exceptions +from sky import global_user_state +from sky.utils import db_utils + + +class TestExecutionOnExistingClusters: + """Test operations on reserved clusters.""" + + @pytest.fixture + def _mock_db_conn(self, monkeypatch, tmp_path): + tmp_path.mkdir(parents=True, exist_ok=True) + db_path = tmp_path / 'state_testing_optimizer_dryrun.db' + monkeypatch.setattr( + global_user_state, '_DB', + db_utils.SQLiteConn(str(db_path), global_user_state.create_table)) + + @pytest.fixture + def _mock_cluster_state(self, _mock_db_conn, enable_all_clouds): + """Add clusters to the global state. + + This fixture adss three clusters to the global state: + - test-cluster1: AWS, 2x p4d.24xlarge (8x A100) + - test-cluster2: GCP, 1x n1-highmem-64, 4x V100 + - test-cluster3: Azure, 1x Standard_D4s_v3 (CPU only) + """ + assert 'state.db' not in global_user_state._DB.db_path + + handle = backends.CloudVmRayResourceHandle( + cluster_name='test-cluster1', + cluster_yaml='/tmp/cluster1.yaml', + launched_nodes=2, + launched_resources=sky.Resources(sky.AWS(), + instance_type='p4d.24xlarge', + region='us-east-1', + zone='us-east-1a'), + ) + global_user_state.add_or_update_cluster( + 'test-cluster1', + handle, + requested_resources={handle.launched_resources}, + ready=True) + handle = backends.CloudVmRayResourceHandle( + cluster_name='test-cluster2', + cluster_yaml='/tmp/cluster2.yaml', + launched_nodes=1, + launched_resources=sky.Resources(sky.GCP(), + instance_type='n1-highmem-64', + accelerators='V100:4', + region='us-west1', + zone='us-west1-a'), + ) + global_user_state.add_or_update_cluster( + 'test-cluster2', + handle, + requested_resources={handle.launched_resources}, + ready=True) + handle = backends.CloudVmRayResourceHandle( + cluster_name='test-cluster3', + cluster_yaml='/tmp/cluster3.yaml', + launched_nodes=1, + launched_resources=sky.Resources(sky.Azure(), + instance_type='Standard_D4s_v3', + region='eastus'), + ) + global_user_state.add_or_update_cluster( + 'test-cluster3', + handle, + requested_resources={handle.launched_resources}, + ready=False) + + def test_launch_exec(self, _mock_cluster_state, monkeypatch): + """Test launch and exec on existing clusters. + + This test runs launch and exec with less demanding resources + than the existing clusters can pass the check. + """ + task = sky.Task(run='echo hi') + task.set_resources(sky.Resources(accelerators='A100:8')) + sky.launch(task, cluster_name='test-cluster1', dryrun=True) + sky.exec(task, cluster_name='test-cluster1', dryrun=True) + task.set_resources(sky.Resources(accelerators='A100:3')) + sky.launch(task, cluster_name='test-cluster1', dryrun=True) + sky.exec(task, cluster_name='test-cluster1', dryrun=True) + task.set_resources( + sky.Resources( + sky.AWS(), + accelerators='A100:1', + region='us-east-1', + )) + sky.launch(task, cluster_name='test-cluster1', dryrun=True) + sky.exec(task, cluster_name='test-cluster1', dryrun=True) + + task = sky.Task(run='echo hi') + task.set_resources(sky.Resources(accelerators='V100:4')) + sky.launch(task, cluster_name='test-cluster2', dryrun=True) + sky.exec(task, cluster_name='test-cluster2', dryrun=True) + task.set_resources( + sky.Resources(sky.GCP(), accelerators='V100:3', region='us-west1')) + sky.launch(task, cluster_name='test-cluster2', dryrun=True) + sky.exec(task, cluster_name='test-cluster2', dryrun=True) + + task = sky.Task(run='echo hi') + sky.exec(task, cluster_name='test-cluster3', dryrun=True) + + def _run_launch_exec_with_error(self, task, cluster_name): + with pytest.raises(exceptions.ResourcesMismatchError) as e: + sky.launch(task, cluster_name=cluster_name, dryrun=True) + assert 'do not match the existing cluster.' in str(e.value), str( + e.value) + with pytest.raises(exceptions.ResourcesMismatchError) as e: + sky.exec(task, cluster_name=cluster_name, dryrun=True) + assert 'do not match the existing cluster.' in str(e.value), str( + e.value) + + def test_launch_exec_mismatch(self, _mock_cluster_state, monkeypatch): + """Test launch and exec on existing clusters with mismatched resources.""" + task = sky.Task(run='echo hi') + # Accelerators mismatch + task.set_resources(sky.Resources(accelerators='V100:8')) + self._run_launch_exec_with_error(task, 'test-cluster1') + self._run_launch_exec_with_error(task, 'test-cluster2') + + task.set_resources(sky.Resources(accelerators='A100:8')) + self._run_launch_exec_with_error(task, 'test-cluster2') + self._run_launch_exec_with_error(task, 'test-cluster3') + + # Cloud mismatch + task.set_resources(sky.Resources(sky.AWS(), accelerators='V100')) + self._run_launch_exec_with_error(task, 'test-cluster2') + + task.set_resources(sky.Resources(sky.GCP())) + self._run_launch_exec_with_error(task, 'test-cluster1') diff --git a/tests/test_optimizer_dryruns.py b/tests/test_optimizer_dryruns.py index dc7b11768c6..8bfa4944a54 100644 --- a/tests/test_optimizer_dryruns.py +++ b/tests/test_optimizer_dryruns.py @@ -329,13 +329,34 @@ def test_instance_type_mistmatches_accelerators(monkeypatch): ('m4.2xlarge', 'V100'), ] for instance, acc in bad_instance_and_accs: - with pytest.raises(ValueError) as e: + with pytest.raises(exceptions.ResourcesMismatchError) as e: _test_resources_launch(monkeypatch, sky.AWS(), instance_type=instance, accelerators=acc) assert 'Infeasible resource demands found' in str(e.value) + with pytest.raises(exceptions.ResourcesMismatchError) as e: + _test_resources_launch(monkeypatch, + sky.GCP(), + instance_type='n2-standard-8', + accelerators={'V100': 1}) + assert 'can only be attached to N1 VMs,' in str(e.value), str(e.value) + + with pytest.raises(exceptions.ResourcesMismatchError) as e: + _test_resources_launch(monkeypatch, + sky.GCP(), + instance_type='a2-highgpu-1g', + accelerators={'A100': 2}) + assert 'cannot be attached to' in str(e.value), str(e.value) + + with pytest.raises(exceptions.ResourcesMismatchError) as e: + _test_resources_launch(monkeypatch, + sky.AWS(), + instance_type='p3.16xlarge', + accelerators={'V100': 1}) + assert 'Infeasible resource demands found' in str(e.value) + def test_instance_type_matches_accelerators(monkeypatch): _test_resources_launch(monkeypatch, @@ -346,11 +367,20 @@ def test_instance_type_matches_accelerators(monkeypatch): sky.GCP(), instance_type='n1-standard-2', accelerators='V100') - # Partial use: Instance has 8 V100s, while the task needs 1 of them. + + _test_resources_launch(monkeypatch, + sky.GCP(), + instance_type='n1-standard-8', + accelerators='tpu-v3-8') + _test_resources_launch(monkeypatch, + sky.GCP(), + instance_type='a2-highgpu-1g', + accelerators='a100') + _test_resources_launch(monkeypatch, sky.AWS(), instance_type='p3.16xlarge', - accelerators={'V100': 1}) + accelerators={'V100': 8}) def test_invalid_instance_type(monkeypatch): @@ -402,6 +432,13 @@ def test_invalid_region(monkeypatch): _test_resources(monkeypatch, cloud, region='invalid') assert 'Invalid region' in str(e.value) + with pytest.raises(exceptions.ResourcesUnavailableError) as e: + _test_resources_launch(monkeypatch, + sky.GCP(), + region='us-west1', + accelerators='tpu-v3-8') + assert 'No launchable resource found' in str(e.value) + def test_invalid_zone(monkeypatch): for cloud in [sky.AWS(), sky.GCP()]: @@ -599,3 +636,16 @@ def test_parse_valid_envs_yaml(monkeypatch): GOOD123: 123 """) _test_parse_task_yaml(spec) + + +def test_invalid_accelerators_regions(enable_all_clouds, monkeypatch): + task = sky.Task(run='echo hi') + task.set_resources( + sky.Resources( + sky.AWS(), + accelerators='A100:8', + region='us-west-1', + )) + with pytest.raises(exceptions.ResourcesUnavailableError) as e: + sky.launch(task, cluster_name='should-fail', dryrun=True) + assert 'No launchable resource found for' in str(e.value), str(e.value)