Skip to content

Commit

Permalink
feat(app): use crac service
Browse files Browse the repository at this point in the history
  • Loading branch information
olevski committed May 15, 2023
1 parent 09d8891 commit 999ea22
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 101 deletions.
2 changes: 1 addition & 1 deletion renku_notebooks/api/amalthea_patches/init_containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def git_clone(server: "UserServer"):
},
{
"name": "GIT_CLONE_LFS_AUTO_FETCH",
"value": "1" if server.server_options["lfs_auto_fetch"] else "0",
"value": "1" if server.server_options.lfs_auto_fetch else "0",
},
{"name": "GIT_CLONE_COMMIT_SHA", "value": server.commit_sha},
{"name": "GIT_CLONE_BRANCH", "value": server.branch},
Expand Down
98 changes: 98 additions & 0 deletions renku_notebooks/api/classes/crac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional

import requests

from ...errors.intermittent import IntermittentError
from ...errors.programming import ConfigurationError
from ...errors.user import InvalidComputeResourceError
from ..schemas.server_options import ServerOptions
from .user import User


@dataclass
class CRACValidator:
"""Calls to the CRAC service to validate resource requests."""

crac_url: str

def __post_init__(self):
self.crac_url = self.crac_url.rstrip("/")

def validate_class_storage(
self,
user: User,
class_id: int,
storage: Optional[int] = None,
) -> ServerOptions:
"""Ensures that the resource class and storage requested is valid."""
headers = None
if user.access_token is not None:
headers = {"Authorization": f"bearer {user.access_token}"}
res = requests.get(self.crac_url + f"/classes/{class_id}", headers=headers)
if res.status_code != 200:
raise InvalidComputeResourceError(
message="The requested resource class does not exist or you do not "
"have the required permissions to access it."
)
res_class = res.json()
if storage is None:
storage = res_class.get("default_storage", 1)
if storage < 1:
raise InvalidComputeResourceError(
message="Storage requests have to be greater than or equal to 1GB."
)
if storage > res_class.get("max_storage"):
raise InvalidComputeResourceError(
message="The requested storage surpasses the maximum value allowed."
)
# Memory and disk space in CRAC are assumed to be in gigabytes whereas
# the notebook service assumes that if a plain number is used then it is bytes.
options = ServerOptions.from_resource_class(res_class)
options.storage = storage * 1000000000
return options

def get_default_class(self) -> Dict[str, Any]:
res = requests.get(self.crac_url + "/resource_pools")
if res.status_code != 200:
raise IntermittentError(
"The CRAC sent an unexpected response, please try again later."
)
pools = res.json()
default_pools = [p for p in pools if p.get("default", False)]
if len(default_pools) < 1:
raise ConfigurationError("Cannot find the default resource pool.")
default_pool = default_pools[0]
return default_pool

def find_acceptable_class(
self, user: User, requested_server_options: ServerOptions
) -> Optional[ServerOptions]:
"""Find a resource class that is available to the user that is greater than or equal to
the old-style server options that the user requested."""
headers = None
if user.access_token is not None:
headers = {"Authorization": f"bearer {user.access_token}"}
res = requests.get(self.crac_url + "/resource_pools", headers=headers)
if res.status_code != 200:
raise IntermittentError(
message="The compute resource access control service sent "
"an unexpected response, please try again later",
)
resource_pools = res.json()
# Difference and best candidate in the case that the resource class will be
# greater than or equal to the request
best_larger_or_equal_diff = None
best_larger_or_equal_class = None
zero_diff = ServerOptions(cpu=0, memory=0, gpu=0, storage=0)
for resource_pool in resource_pools:
for resource_class in resource_pool["classes"]:
resource_class_mdl = ServerOptions.from_resource_class(resource_class)
diff = resource_class_mdl - requested_server_options
if diff >= zero_diff and (
best_larger_or_equal_diff is None
or diff < best_larger_or_equal_diff
):
best_larger_or_equal_diff = diff
best_larger_or_equal_class = resource_class
return best_larger_or_equal_class
38 changes: 10 additions & 28 deletions renku_notebooks/api/classes/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ...errors.user import MissingResourceError
from .k8s_client import K8sClient
from .cloud_storage import ICloudStorageRequest
from ..schemas.server_options import ServerOptions
from .user import RegisteredUser, User
from ...util.check_image import (
get_docker_token,
Expand All @@ -43,7 +44,7 @@ def __init__(
commit_sha: str,
notebook: Optional[str], # TODO: Is this value actually needed?
image: Optional[str],
server_options: Dict[str, Any],
server_options: ServerOptions,
environment_variables: Dict[str, str],
cloudstorage: List[ICloudStorageRequest],
k8s_client: K8sClient,
Expand Down Expand Up @@ -221,40 +222,21 @@ def _get_registry_secret(self, b64encode=True):
return output

def _get_session_k8s_resources(self):
cpu_request = float(self.server_options["cpu_request"])
mem = self.server_options["mem_request"]
gpu_req = self.server_options.get("gpu_request", {})
gpu = {"nvidia.com/gpu": str(gpu_req)} if gpu_req else None
cpu_request = float(self.server_options.cpu)
mem = self.server_options.memory
gpu_req = self.server_options.gpu
gpu = {"nvidia.com/gpu": str(gpu_req)} if gpu_req > 0 else None
resources = {
"requests": {"memory": mem, "cpu": cpu_request},
"limits": {"memory": mem},
}
if config.sessions.enforce_cpu_limits == "lax":
if "cpu_request" in config.server_options.ui_choices:
resources["limits"]["cpu"] = max(
config.server_options.ui_choices["cpu_request"]["options"]
)
else:
resources["limits"]["cpu"] = cpu_request
resources["limits"]["cpu"] = 2 * cpu_request
elif config.sessions.enforce_cpu_limits == "strict":
resources["limits"]["cpu"] = cpu_request
if gpu:
resources["requests"] = {**resources["requests"], **gpu}
resources["limits"] = {**resources["limits"], **gpu}
if "ephemeral-storage" in self.server_options.keys():
ephemeral_storage = (
self.server_options["ephemeral-storage"]
if config.sessions.storage.pvs_enabled
else self.server_options["disk_request"]
)
resources["requests"] = {
**resources["requests"],
"ephemeral-storage": ephemeral_storage,
}
resources["limits"] = {
**resources["limits"],
"ephemeral-storage": ephemeral_storage,
}
return resources

def _get_session_manifest(self):
Expand Down Expand Up @@ -290,7 +272,7 @@ def _get_session_manifest(self):
# Storage
if config.sessions.storage.pvs_enabled:
storage = {
"size": self.server_options["disk_request"],
"size": self.server_options.storage,
"pvc": {
"enabled": True,
"storageClassName": config.sessions.storage.pvs_storage_class,
Expand All @@ -299,7 +281,7 @@ def _get_session_manifest(self):
}
else:
storage = {
"size": self.server_options["disk_request"]
"size": self.server_options.storage
if config.sessions.storage.use_empty_dir_size_limit
else "",
"pvc": {
Expand Down Expand Up @@ -348,7 +330,7 @@ def _get_session_manifest(self):
),
},
"jupyterServer": {
"defaultUrl": self.server_options["defaultUrl"],
"defaultUrl": self.server_options.default_url,
"image": self.verified_image,
"rootDir": self.image_workdir.rstrip("/")
+ f"/work/{self.gl_project.path}/",
Expand Down
5 changes: 5 additions & 0 deletions renku_notebooks/api/classes/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@


class User(ABC):
@property
@abstractmethod
def access_token(self) -> Optional[str]:
return None

@abstractmethod
def get_autosaves(self, *args, **kwargs):
pass
Expand Down
56 changes: 46 additions & 10 deletions renku_notebooks/api/notebooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Notebooks service API."""
from flask import Blueprint, current_app, make_response, jsonify
from flask import Blueprint, current_app, jsonify, make_response
from marshmallow import fields, validate
from webargs.flaskparser import use_args

from renku_notebooks.util.check_image import (
get_docker_token,
image_exists,
parse_image_name,
)

from ..config import config
from ..errors.user import ImageParseError, MissingResourceError, UserInputError
from ..util.check_image import get_docker_token, image_exists, parse_image_name
from ..util.kubernetes_ import make_server_name
from .auth import authenticated
from .classes.crac import CRACValidator
from .classes.server import UserServer
from .classes.server_manifest import UserServerManifest
from .classes.storage import AutosaveBranch
from ..errors.user import ImageParseError, MissingResourceError, UserInputError
from .schemas.autosave import AutosavesList
from .schemas.config_server_options import ServerOptionsEndpointResponse
from .schemas.logs import ServerLogs
from .schemas.server_options import ServerOptions
from .schemas.servers_get import NotebookResponse, ServersGetRequest, ServersGetResponse
from .schemas.servers_post import LaunchNotebookRequest
from .schemas.version import VersionResponse
from ..util.kubernetes_ import make_server_name

bp = Blueprint("notebooks_blueprint", __name__, url_prefix=config.service_prefix)

Expand Down Expand Up @@ -169,6 +166,8 @@ def launch_notebook(
notebook,
image,
server_options,
resource_class_id,
storage,
environment_variables,
cloudstorage=None,
):
Expand Down Expand Up @@ -201,13 +200,50 @@ def launch_notebook(
tags:
- servers
"""
crac_validator = CRACValidator(config.crac_url)
server_name = make_server_name(
user.safe_username, namespace, project, branch, commit_sha
)
server = config.k8s.client.get_server(server_name, user.safe_username)
if server:
return NotebookResponse().dump(UserServerManifest(server)), 200

parsed_server_options = None
if resource_class_id is not None:
# A resource class ID was passed in, validate with CRAC servuce
parsed_server_options = crac_validator.validate_class_storage(
user, resource_class_id, storage
)
elif server_options > ServerOptions(0, 0, 0, 0):
# The old style API was used, try to find a matching class from the CRAC service
parsed_server_options = crac_validator.find_acceptable_class(
user, server_options
)
if parsed_server_options is None:
raise UserInputError(
message="Cannot find suitable server options based on your request and "
"the available resource classes.",
detail="You are receiving this error because you are using the old API for "
"selecting resources. Updating to the new API which includes specifying only "
"a specific resource class ID and storage is preferred and more convenient.",
)
else:
# No resource class ID specified or old-style server options, use defaults from CRAC
default_resource_class = crac_validator.get_default_class()
max_storage_gb = default_resource_class.get("max_storage", 0)
if storage is not None and storage > max_storage_gb:
raise UserInputError(
"The requested storage amount is higher than the "
f"allowable maximum for the default resource class of {max_storage_gb}GB."
)
if storage is None:
storage = default_resource_class.get("default_storage")
parsed_server_options = ServerOptions.from_resource_class(
default_resource_class
)
# ServerOptions stores memory and storage in bytes, but storage in request is in GB
parsed_server_options.storage = storage * 1000000000

server = UserServer(
user,
namespace,
Expand All @@ -216,7 +252,7 @@ def launch_notebook(
commit_sha,
notebook,
image,
server_options,
parsed_server_options,
environment_variables,
cloudstorage or [],
config.k8s.client,
Expand Down
Loading

0 comments on commit 999ea22

Please sign in to comment.