Skip to content

Commit

Permalink
feat: rclone csi support
Browse files Browse the repository at this point in the history
  • Loading branch information
Panaetius authored and Ralf Grubenmann committed Nov 15, 2023
1 parent e39af86 commit cf4134a
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 83 deletions.
17 changes: 6 additions & 11 deletions renku_notebooks/api/notebooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,17 +321,12 @@ def launch_notebook(
if cloudstorage:
gl_project_id = gl_project.id if gl_project is not None else 0
try:
cloudstorage = list(
map(
partial(
create_cloud_storage_object,
user=user,
project_id=gl_project_id,
work_dir=server_work_dir.absolute(),
),
cloudstorage,
)
)
for storage in cloudstorage:
storage.init_config(
user=user,
project_id=gl_project_id,
work_dir=server_work_dir.absolute(),
)
except ValidationError as e:
raise UserInputError(f"Couldn't load cloud storage config: {str(e)}")
mount_points = set(
Expand Down
193 changes: 121 additions & 72 deletions renku_notebooks/api/schemas/cloud_storage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from pathlib import Path
from typing import Any, Dict, Optional
from io import StringIO
from configparser import ConfigParser
from typing import Any, Dict, Optional, List

from marshmallow import EXCLUDE, Schema, ValidationError, fields, validates_schema
from renku_notebooks.errors.programming import ProgrammingError

from ...config import config
from ..classes.cloud_storage.azure_blob import AzureBlobRequest
Expand All @@ -20,6 +23,7 @@ class Meta:
)
storage_id: Optional[str] = fields.Str(load_default=None, allow_none=True)
readonly: bool = fields.Bool(load_default=True, allow_none=False)
_mount_folder=None

@validates_schema
def validate_storage(self, data, **kwargs):
Expand All @@ -28,80 +32,125 @@ def validate_storage(self, data, **kwargs):
"'storage_id' cannot be used together with 'source_path' or 'target_path'"
)

def init_config(self,data, user, project_id, work_dir):
if self.storage_id:
# Load from storage service
if user.access_token is None:
raise ValidationError("Storage mounting is only supported for logged-in users.")
if project_id < 1:
raise ValidationError("Could not get gitlab project id")
(
configuration,
self.source_path,
self.target_path,
readonly,
) = config.storage_validator.get_storage_by_id(user, project_id, self.storage_id)
self.configuration = {**configuration, **(self.configuration or {})}
self.readonly = self.readonly
else:
self.source_path = data["source_path"]
self.target_path = data["target_path"]
self.configuration = data["configuration"]
self.readonly = self.readonly
config.storage_validator.validate_storage_configuration(self.configuration)
self._mount_folder=work_dir/self.target_path

def create_cloud_storage_object(data: Dict[str, Any], user: User, project_id: int, work_dir: Path):
if data.get("storage_id") and (data.get("source_path") or data.get("target_path")):
raise ValidationError(
"'storage_id' cannot be used together with 'source_path' or 'target_path'"
)
if data.get("storage_id"):
# Load from storage service
if user.access_token is None:
raise ValidationError("Storage mounting is only supported for logged-in users.")
if project_id < 1:
raise ValidationError("Could not get gitlab project id")
(
configuration,
source_path,
target_path,
readonly,
) = config.storage_validator.get_storage_by_id(user, project_id, data["storage_id"])
configuration = {**configuration, **(data.get("configuration") or {})}
readonly = data.get("readonly", readonly)
else:
source_path = data["source_path"]
target_path = data["target_path"]
configuration = data["configuration"]
readonly = data.get("readonly", True)
@property
def mount_folder(self):
if not self._mount_folder:
raise ProgrammingError("mount_folder not set. Ensure init_config was called first.")
return self._mount_folder

config.storage_validator.validate_storage_configuration(configuration)

path = source_path.lstrip("/")
if "/" in path:
bucket, source_path = path.split("/", 1)
else:
bucket, source_path = path, ""

cloud_storage: AzureBlobRequest | S3Request
if (
configuration.get("type") == "azureblob"
and configuration.get("access_key_id") is None
and configuration.get("secret_access_key") is not None
and config.cloud_storage.azure_blob.enabled
):
cloud_storage = AzureBlobRequest(
endpoint=configuration["endpoint"],
container=bucket,
credential=configuration["secret_access_key"],
mount_folder=str(work_dir / target_path),
source_folder=source_path,
read_only=readonly,
)
elif configuration.get("type") == "s3" and config.cloud_storage.s3.enabled:
cloud_storage = S3Request(
endpoint=configuration.get("endpoint"),
region=configuration.get("region"),
bucket=bucket,
access_key=configuration.get("access_key_id"),
secret_key=configuration.get("secret_access_key"),
mount_folder=work_dir / target_path,
source_folder=source_path,
read_only=data.get("readonly", True),
)
else:
raise ValidationError(
"Cannot accept the provided cloud storage parameters because "
"the requested storage type has not been properly setup or enabled."
)

if not cloud_storage.exists:
raise ValidationError(
f"Cannot find bucket {bucket} at endpoint {cloud_storage.endpoint}. "
"Please make sure you have provided the correct "
"credentials, bucket name and endpoint."
)
return cloud_storage
def get_manifest_patch(
self, base_name: str, namespace: str, labels={}, annotations={}
) -> List[Dict[str, Any]]:
patches = []
patches.append(
{
"type": "application/json-patch+json",
"patch": [
{
"op": "add",
"path": f"/{base_name}",
"value": {
"apiVersion": "v1",
"kind": "PersistentVolume",
"metadata": {
"name": base_name,
"labels":{
"name":base_name
},
"spec":{
"accessModes":["ReadOnlyMany" if self.readonly else "ReadWriteMany"],
"capacity":{"storage":"10Gi"},
"storageClassName":"rclone",
"csi":{
"driver":"csi-rclone",
"volumeHandle":base_name,
"volumeAttributes":{
"remote":base_name,
"remotePath":self.source_folder,
"configData": self.config_string(base_name)
}
}
}
}
}
},
{
"op": "add",
"path": f"/{base_name}",
"value": {
"apiVersion": "v1",
"kind": "PersistentVolumeClaim",
"metadata": {
"name":base_name,
"namespace":namespace,

},
"spec":{
"storageClassName":"rclone",
"accessModes":["ReadOnlyMany" if self.readonly else "ReadWriteMany"],
"resources":{
"requests":{
"storage":"10Gi"
}
}
}
}
},
{
"op": "add",
"path": "/statefulset/spec/template/spec/containers/0/volumeMounts/-",
"value": {
"mountPath": self.mount_folder,
"name": base_name,
},
},
{
"op": "add",
"path": "/statefulset/spec/template/spec/volumes/-",
"value": {
"name": base_name,
"persistentVolumeClaim": {"claimName": base_name},
},
},
]
}
)
return patches


def config_string(self, name:str)->str:
if not self.configuration:
raise ValidationError("Missing configuration for cloud storage")
parser = ConfigParser()
parser.add_section(name)
for k,v in self.configuration.items():
parser.set(name, k,v)
stringio = StringIO()
parser.write(stringio)
return stringio.getvalue()

class LaunchNotebookResponseCloudStorage(RCloneStorageRequest):
class Meta:
Expand Down

0 comments on commit cf4134a

Please sign in to comment.