From 396d5c57b40fef30418c07c4ac146de5bb3808d9 Mon Sep 17 00:00:00 2001 From: Tasko Olevski Date: Tue, 16 May 2023 01:08:26 +0200 Subject: [PATCH] feat(app): use crac service --- .../api/amalthea_patches/init_containers.py | 2 +- renku_notebooks/api/classes/crac.py | 98 +++++++++++ renku_notebooks/api/classes/server.py | 38 ++--- renku_notebooks/api/classes/user.py | 5 + renku_notebooks/api/notebooks.py | 56 ++++-- renku_notebooks/api/schemas/server_options.py | 160 ++++++++++++------ renku_notebooks/api/schemas/servers_post.py | 2 + renku_notebooks/config/__init__.py | 2 + renku_notebooks/errors/user.py | 11 ++ tests/unit/test_server_class/test_manifest.py | 16 +- 10 files changed, 289 insertions(+), 101 deletions(-) create mode 100644 renku_notebooks/api/classes/crac.py diff --git a/renku_notebooks/api/amalthea_patches/init_containers.py b/renku_notebooks/api/amalthea_patches/init_containers.py index 68dbabe28..10b205e2e 100644 --- a/renku_notebooks/api/amalthea_patches/init_containers.py +++ b/renku_notebooks/api/amalthea_patches/init_containers.py @@ -26,7 +26,7 @@ def git_clone(server: "UserServer"): }, { "name": "GIT_CLONE_LFS_AUTO_FETCH", - "value": "1" if server.server_options["lfs_auto_fetch"] else "0", + "value": "1" if server.server_options.lfs_auto_fetch else "0", }, {"name": "GIT_CLONE_COMMIT_SHA", "value": server.commit_sha}, {"name": "GIT_CLONE_BRANCH", "value": server.branch}, diff --git a/renku_notebooks/api/classes/crac.py b/renku_notebooks/api/classes/crac.py new file mode 100644 index 000000000..60fdda347 --- /dev/null +++ b/renku_notebooks/api/classes/crac.py @@ -0,0 +1,98 @@ +from dataclasses import dataclass +from typing import Any, Dict, Optional + +import requests + +from ...errors.intermittent import IntermittentError +from ...errors.programming import ConfigurationError +from ...errors.user import InvalidComputeResourceError +from ..schemas.server_options import ServerOptions +from .user import User + + +@dataclass +class CRACValidator: + """Calls to the CRAC service to validate resource requests.""" + + crac_url: str + + def __post_init__(self): + self.crac_url = self.crac_url.rstrip("/") + + def validate_class_storage( + self, + user: User, + class_id: int, + storage: Optional[int] = None, + ) -> ServerOptions: + """Ensures that the resource class and storage requested is valid.""" + headers = None + if user.access_token is not None: + headers = {"Authorization": f"bearer {user.access_token}"} + res = requests.get(self.crac_url + f"/classes/{class_id}", headers=headers) + if res.status_code != 200: + raise InvalidComputeResourceError( + message="The requested resource class does not exist or you do not " + "have the required permissions to access it." + ) + res_class = res.json() + if storage is None: + storage = res_class.get("default_storage", 1) + if storage < 1: + raise InvalidComputeResourceError( + message="Storage requests have to be greater than or equal to 1GB." + ) + if storage > res_class.get("max_storage"): + raise InvalidComputeResourceError( + message="The requested storage surpasses the maximum value allowed." + ) + # Memory and disk space in CRAC are assumed to be in gigabytes whereas + # the notebook service assumes that if a plain number is used then it is bytes. + options = ServerOptions.from_resource_class(res_class) + options.storage = storage * 1000000000 + return options + + def get_default_class(self) -> Dict[str, Any]: + res = requests.get(self.crac_url + "/resource_pools") + if res.status_code != 200: + raise IntermittentError( + "The CRAC sent an unexpected response, please try again later." + ) + pools = res.json() + default_pools = [p for p in pools if p.get("default", False)] + if len(default_pools) < 1: + raise ConfigurationError("Cannot find the default resource pool.") + default_pool = default_pools[0] + return default_pool + + def find_acceptable_class( + self, user: User, requested_server_options: ServerOptions + ) -> Optional[ServerOptions]: + """Find a resource class that is available to the user that is greater than or equal to + the old-style server options that the user requested.""" + headers = None + if user.access_token is not None: + headers = {"Authorization": f"bearer {user.access_token}"} + res = requests.get(self.crac_url + "/resource_pools", headers=headers) + if res.status_code != 200: + raise IntermittentError( + message="The compute resource access control service sent " + "an unexpected response, please try again later", + ) + resource_pools = res.json() + # Difference and best candidate in the case that the resource class will be + # greater than or equal to the request + best_larger_or_equal_diff = None + best_larger_or_equal_class = None + zero_diff = ServerOptions(cpu=0, memory=0, gpu=0, storage=0) + for resource_pool in resource_pools: + for resource_class in resource_pool["classes"]: + resource_class_mdl = ServerOptions.from_resource_class(resource_class) + diff = resource_class_mdl - requested_server_options + if diff >= zero_diff and ( + best_larger_or_equal_diff is None + or diff < best_larger_or_equal_diff + ): + best_larger_or_equal_diff = diff + best_larger_or_equal_class = resource_class + return best_larger_or_equal_class diff --git a/renku_notebooks/api/classes/server.py b/renku_notebooks/api/classes/server.py index 5c1d90fdc..e923bcedb 100644 --- a/renku_notebooks/api/classes/server.py +++ b/renku_notebooks/api/classes/server.py @@ -21,6 +21,7 @@ from ...errors.user import MissingResourceError from .k8s_client import K8sClient from .cloud_storage import ICloudStorageRequest +from ..schemas.server_options import ServerOptions from .user import RegisteredUser, User from ...util.check_image import ( get_docker_token, @@ -43,7 +44,7 @@ def __init__( commit_sha: str, notebook: Optional[str], # TODO: Is this value actually needed? image: Optional[str], - server_options: Dict[str, Any], + server_options: ServerOptions, environment_variables: Dict[str, str], cloudstorage: List[ICloudStorageRequest], k8s_client: K8sClient, @@ -221,40 +222,21 @@ def _get_registry_secret(self, b64encode=True): return output def _get_session_k8s_resources(self): - cpu_request = float(self.server_options["cpu_request"]) - mem = self.server_options["mem_request"] - gpu_req = self.server_options.get("gpu_request", {}) - gpu = {"nvidia.com/gpu": str(gpu_req)} if gpu_req else None + cpu_request = float(self.server_options.cpu) + mem = self.server_options.memory + gpu_req = self.server_options.gpu + gpu = {"nvidia.com/gpu": str(gpu_req)} if gpu_req > 0 else None resources = { "requests": {"memory": mem, "cpu": cpu_request}, "limits": {"memory": mem}, } if config.sessions.enforce_cpu_limits == "lax": - if "cpu_request" in config.server_options.ui_choices: - resources["limits"]["cpu"] = max( - config.server_options.ui_choices["cpu_request"]["options"] - ) - else: - resources["limits"]["cpu"] = cpu_request + resources["limits"]["cpu"] = 2 * cpu_request elif config.sessions.enforce_cpu_limits == "strict": resources["limits"]["cpu"] = cpu_request if gpu: resources["requests"] = {**resources["requests"], **gpu} resources["limits"] = {**resources["limits"], **gpu} - if "ephemeral-storage" in self.server_options.keys(): - ephemeral_storage = ( - self.server_options["ephemeral-storage"] - if config.sessions.storage.pvs_enabled - else self.server_options["disk_request"] - ) - resources["requests"] = { - **resources["requests"], - "ephemeral-storage": ephemeral_storage, - } - resources["limits"] = { - **resources["limits"], - "ephemeral-storage": ephemeral_storage, - } return resources def _get_session_manifest(self): @@ -290,7 +272,7 @@ def _get_session_manifest(self): # Storage if config.sessions.storage.pvs_enabled: storage = { - "size": self.server_options["disk_request"], + "size": self.server_options.storage, "pvc": { "enabled": True, "storageClassName": config.sessions.storage.pvs_storage_class, @@ -299,7 +281,7 @@ def _get_session_manifest(self): } else: storage = { - "size": self.server_options["disk_request"] + "size": self.server_options.storage if config.sessions.storage.use_empty_dir_size_limit else "", "pvc": { @@ -348,7 +330,7 @@ def _get_session_manifest(self): ), }, "jupyterServer": { - "defaultUrl": self.server_options["defaultUrl"], + "defaultUrl": self.server_options.default_url, "image": self.verified_image, "rootDir": self.image_workdir.rstrip("/") + f"/work/{self.gl_project.path}/", diff --git a/renku_notebooks/api/classes/user.py b/renku_notebooks/api/classes/user.py index 627bccfbd..8c76cc21a 100644 --- a/renku_notebooks/api/classes/user.py +++ b/renku_notebooks/api/classes/user.py @@ -19,6 +19,11 @@ class User(ABC): + @property + @abstractmethod + def access_token(self) -> Optional[str]: + return None + @abstractmethod def get_autosaves(self, *args, **kwargs): pass diff --git a/renku_notebooks/api/notebooks.py b/renku_notebooks/api/notebooks.py index cf5b8b6c7..6b9890034 100644 --- a/renku_notebooks/api/notebooks.py +++ b/renku_notebooks/api/notebooks.py @@ -16,29 +16,26 @@ # See the License for the specific language governing permissions and # limitations under the License. """Notebooks service API.""" -from flask import Blueprint, current_app, make_response, jsonify +from flask import Blueprint, current_app, jsonify, make_response from marshmallow import fields, validate from webargs.flaskparser import use_args -from renku_notebooks.util.check_image import ( - get_docker_token, - image_exists, - parse_image_name, -) - from ..config import config +from ..errors.user import ImageParseError, MissingResourceError, UserInputError +from ..util.check_image import get_docker_token, image_exists, parse_image_name +from ..util.kubernetes_ import make_server_name from .auth import authenticated +from .classes.crac import CRACValidator from .classes.server import UserServer from .classes.server_manifest import UserServerManifest from .classes.storage import AutosaveBranch -from ..errors.user import ImageParseError, MissingResourceError, UserInputError from .schemas.autosave import AutosavesList from .schemas.config_server_options import ServerOptionsEndpointResponse from .schemas.logs import ServerLogs +from .schemas.server_options import ServerOptions from .schemas.servers_get import NotebookResponse, ServersGetRequest, ServersGetResponse from .schemas.servers_post import LaunchNotebookRequest from .schemas.version import VersionResponse -from ..util.kubernetes_ import make_server_name bp = Blueprint("notebooks_blueprint", __name__, url_prefix=config.service_prefix) @@ -169,6 +166,8 @@ def launch_notebook( notebook, image, server_options, + resource_class_id, + storage, environment_variables, cloudstorage=None, ): @@ -201,6 +200,7 @@ def launch_notebook( tags: - servers """ + crac_validator = CRACValidator(config.crac_url) server_name = make_server_name( user.safe_username, namespace, project, branch, commit_sha ) @@ -208,6 +208,42 @@ def launch_notebook( if server: return NotebookResponse().dump(UserServerManifest(server)), 200 + parsed_server_options = None + if resource_class_id is not None: + # A resource class ID was passed in, validate with CRAC servuce + parsed_server_options = crac_validator.validate_class_storage( + user, resource_class_id, storage + ) + elif server_options > ServerOptions(0, 0, 0, 0): + # The old style API was used, try to find a matching class from the CRAC service + parsed_server_options = crac_validator.find_acceptable_class( + user, server_options + ) + if parsed_server_options is None: + raise UserInputError( + message="Cannot find suitable server options based on your request and " + "the available resource classes.", + detail="You are receiving this error because you are using the old API for " + "selecting resources. Updating to the new API which includes specifying only " + "a specific resource class ID and storage is preferred and more convenient.", + ) + else: + # No resource class ID specified or old-style server options, use defaults from CRAC + default_resource_class = crac_validator.get_default_class() + max_storage_gb = default_resource_class.get("max_storage", 0) + if storage is not None and storage > max_storage_gb: + raise UserInputError( + "The requested storage amount is higher than the " + f"allowable maximum for the default resource class of {max_storage_gb}GB." + ) + if storage is None: + storage = default_resource_class.get("default_storage") + parsed_server_options = ServerOptions.from_resource_class( + default_resource_class + ) + # ServerOptions stores memory and storage in bytes, but storage in request is in GB + parsed_server_options.storage = storage * 1000000000 + server = UserServer( user, namespace, @@ -216,7 +252,7 @@ def launch_notebook( commit_sha, notebook, image, - server_options, + parsed_server_options, environment_variables, cloudstorage or [], config.k8s.client, diff --git a/renku_notebooks/api/schemas/server_options.py b/renku_notebooks/api/schemas/server_options.py index 534c5cc14..c5ee25d77 100644 --- a/renku_notebooks/api/schemas/server_options.py +++ b/renku_notebooks/api/schemas/server_options.py @@ -1,39 +1,93 @@ -from marshmallow import Schema, ValidationError, fields +from dataclasses import dataclass +from typing import Optional, Callable + +from marshmallow import Schema, fields, post_load from ...config import config from .custom_fields import ByteSizeField, CpuField, GpuField -def get_validator(field_name, server_options_ui, server_options_defaults): - def _validate(value): - if field_name in server_options_ui: - if server_options_ui[field_name].get("allow_any_value", False): - return True - elif "value_range" in server_options_ui[field_name]: - within_range = ( - value >= server_options_ui[field_name]["value_range"]["min"] - and value <= server_options_ui[field_name]["value_range"]["max"] - ) - if not within_range: - raise ValidationError( - f"Provided {field_name} value not within allowed range of " - f"{server_options_ui[field_name]['value_range']['min']} and " - f"{server_options_ui[field_name]['value_range']['max']}." - ) - else: - if value not in server_options_ui[field_name]["options"]: - raise ValidationError( - f"Provided {field_name} value is not in the allowed options " - f"{server_options_ui[field_name]['options']}" - ) - else: - if value != server_options_defaults[field_name]: - raise ValidationError( - f"Provided {field_name} value does not match the allowed value of " - f"{server_options_defaults[field_name]}" - ) - - return _validate +@dataclass +class ServerOptions: + """Server options. Memory and storage are in bytes.""" + + cpu: float + memory: int + gpu: int + storage: Optional[int] = None + default_url: Optional[str] = None + lfs_auto_fetch: bool = False + gigabytes: bool = False + + def __post_init__(self): + if self.default_url is None: + self.default_url = config.server_options.defaults["defaultUrl"] + if self.lfs_auto_fetch is None: + self.lfs_auto_fetch = config.server_options.defaults["lfs_auto_fetch"] + + def __compare( + self, + other: "ServerOptions", + compare_func: Callable[["ServerOptions", "ServerOptions"], bool], + ) -> bool: + results = [ + compare_func(self.cpu, other.cpu), + compare_func(self.memory, other.memory), + compare_func(self.gpu, other.gpu), + ] + self_storage = 0 if self.storage is None else self.storage + other_storage = 0 if other.storage is None else other.storage + results.append(compare_func(self_storage, other_storage)) + return all(results) + + def to_gigabytes(self) -> "ServerOptions": + if self.gigabytes: + return self + return ServerOptions( + cpu=self.cpu, + gpu=self.gpu, + default_url=self.default_url, + lfs_auto_fetch=self.lfs_auto_fetch, + memory=self.memory / 1000000000, + storage=self.storage / 1000000000 if self.storage is not None else None, + gigabytes=True, + ) + + def __sub__(self, other: "ServerOptions") -> "ServerOptions": + self_storage = 0 if self.storage is None else self.storage + other_storage = 0 if other.storage is None else other.storage + return ServerOptions( + cpu=self.cpu - other.cpu, + memoory=self.memory - other.memory, + gpu=self.gpu - other.gpu, + storage=self_storage - other_storage, + ) + + def __ge__(self, other: "ServerOptions"): + return self.__compare(other, lambda x, y: x >= y) + + def __gt__(self, other: "ServerOptions"): + return self.__compare(other, lambda x, y: x > y) + + def __lt__(self, other: "ServerOptions"): + return self.__compare(other, lambda x, y: x < y) + + def __le__(self, other: "ServerOptions"): + return self.__compare(other, lambda x, y: x <= y) + + @classmethod + def from_resource_class(cls, data: dict) -> "ServerOptions": + """Convert a CRAC resource class to server options. CRAC users GB for storage and memory + whereas the notebook service uses bytes so we convert to bytes here.""" + storage = data.get("storage") + if storage is not None: + storage = storage * 1000000000 + return cls( + cpu=data["cpu"], + memory=data["memory"] * 1000000000, + gpu=data["gpu"], + storage=storage, + ) class LaunchNotebookRequestServerOptions(Schema): @@ -41,42 +95,38 @@ class LaunchNotebookRequestServerOptions(Schema): required=False, missing=config.server_options.defaults["defaultUrl"], ) + # NOTE: The old-style API server options are only used to then find suitable + # resource class form the crac service. "Suitable" in this case is any resource + # class where all its parameters are greather than or equal to the request. So + # by assigning a value of 0 to a server option we are ensuring that CRAC will + # be able to easily find a match. cpu_request = CpuField( required=False, - missing=config.server_options.defaults["cpu_request"], - validate=get_validator( - "cpu_request", - config.server_options.ui_choices, - config.server_options.defaults, - ), + missing=0, ) mem_request = ByteSizeField( required=False, - missing=config.server_options.defaults["mem_request"], - validate=get_validator( - "mem_request", - config.server_options.ui_choices, - config.server_options.defaults, - ), + missing=0, ) disk_request = ByteSizeField( required=False, - missing=config.server_options.defaults["disk_request"], - validate=get_validator( - "disk_request", - config.server_options.ui_choices, - config.server_options.defaults, - ), + missing=0, ) lfs_auto_fetch = fields.Bool( required=False, missing=config.server_options.defaults["lfs_auto_fetch"] ) gpu_request = GpuField( required=False, - missing=config.server_options.defaults["gpu_request"], - validate=get_validator( - "gpu_request", - config.server_options.ui_choices, - config.server_options.defaults, - ), + missing=0, ) + + @post_load + def make_dataclass(slef, data, **kwargs): + return ServerOptions( + cpu=data["cpu_request"], + gpu=data["gpu_request"], + memory=data["memory_request"], + default_url=data["defaultUrl"], + lfs_auto_fetch=data["lfs_auto_fetch"], + storage=data["disk_request"], + ) diff --git a/renku_notebooks/api/schemas/servers_post.py b/renku_notebooks/api/schemas/servers_post.py index 1b258c943..6b6cfe50e 100644 --- a/renku_notebooks/api/schemas/servers_post.py +++ b/renku_notebooks/api/schemas/servers_post.py @@ -24,6 +24,8 @@ class LaunchNotebookRequestWithoutS3(Schema): data_key="serverOptions", required=False, ) + resource_class_id = fields.Int(required=False, load_default=None) + storage = fields.Int(required=False, load_default=1) environment_variables = fields.Dict( keys=fields.Str(), values=fields.Str(), load_default=dict() ) diff --git a/renku_notebooks/config/__init__.py b/renku_notebooks/config/__init__.py index 29c0c5460..fc16cfc41 100644 --- a/renku_notebooks/config/__init__.py +++ b/renku_notebooks/config/__init__.py @@ -32,6 +32,7 @@ class _NotebooksConfig: service_prefix: str = "/notebooks" version: str = "0.0.0" keycloak_realm: str = "Renku" + crac_url: str = "http://renku-crac" def __post_init__(self): self.anonymous_sessions_enabled = _parse_str_as_bool( @@ -203,6 +204,7 @@ def get_config(default_config: str) -> _NotebooksConfig: service_prefix = /notebooks version = 0.0.0 keycloak_realm = Renku +crac_url = http://renku-crac """ config = get_config(default_config) diff --git a/renku_notebooks/errors/user.py b/renku_notebooks/errors/user.py index 91342f2ad..5a1bcdcca 100644 --- a/renku_notebooks/errors/user.py +++ b/renku_notebooks/errors/user.py @@ -76,3 +76,14 @@ class OverriddenEnvironmentVariableError(UserInputError): message: str code: int = UserInputError.code + 4 + + +@dataclass +class InvalidComputeResourceError(UserInputError): + """Raised when invalid server options are requested or when the user has not access + to a resource class.""" + + message: str = ( + "The specified server options or resources are invalid or cannot be accessed." + ) + code: int = UserInputError.code + 5 diff --git a/tests/unit/test_server_class/test_manifest.py b/tests/unit/test_server_class/test_manifest.py index fe1c6540a..663f94593 100644 --- a/tests/unit/test_server_class/test_manifest.py +++ b/tests/unit/test_server_class/test_manifest.py @@ -4,6 +4,7 @@ from renku_notebooks.api.classes.k8s_client import K8sClient from renku_notebooks.api.classes.server import UserServer +from renku_notebooks.api.schemas.server_options import ServerOptions from renku_notebooks.errors.programming import DuplicateEnvironmentVariableError from renku_notebooks.errors.user import OverriddenEnvironmentVariableError @@ -11,13 +12,14 @@ "namespace": "test-namespace", "project": "test-project", "image": None, - "server_options": { - "lfs_auto_fetch": 0, - "defaultUrl": "/lab", - "cpu_request": "100", - "mem_request": "100", - "disk_request": "100", - }, + "server_options": ServerOptions( + lfs_auto_fetch=0, + default_url="/lab", + cpu=100, + memory=100, + storage=100, + gpu=0, + ), "branch": "master", "commit_sha": "abcdefg123456789", "notebook": "",