From a47cd1fb733e19136fc1c61c859835b953f85249 Mon Sep 17 00:00:00 2001 From: Ofer Koren Date: Mon, 5 Aug 2024 11:52:19 +0000 Subject: [PATCH] VAST Data CSI Plugin - v2.4.1 (from f38da9c67ca435a7ff45df051d411078c44cd795) --- .../prepare_releaser_configuration.py | 2 +- .gitlab-ci.yml | 4 +- CHANGELOG.md | 5 + charts/vastcosi/values.yaml | 2 +- charts/vastcsi/templates/controller.yaml | 8 +- .../vastcsi/templates/shared/_common_env.tpl | 7 +- charts/vastcsi/templates/shared/_vms_auth.tpl | 8 +- charts/vastcsi/templates/snapshot-class.yaml | 43 +++- charts/vastcsi/templates/storage-class.yaml | 39 ++- charts/vastcsi/values.yaml | 73 ++++-- packaging/files/constraints.txt | 1 + packaging/files/requirements.txt | 1 + tests/conftest.py | 15 +- tests/test_controller.py | 30 +-- tests/test_cosi_privisioner.py | 27 +-- tests/test_vms_session.py | 47 ++-- vast_csi/configuration.py | 12 +- vast_csi/exceptions.py | 4 + vast_csi/server.py | 223 ++++++++++-------- vast_csi/utils.py | 19 -- vast_csi/vms_session.py | 223 ++++++++++++------ vast_csi/volume_builder.py | 46 ++-- version.txt | 2 +- 23 files changed, 525 insertions(+), 316 deletions(-) diff --git a/.github/workflows/prepare_releaser_configuration.py b/.github/workflows/prepare_releaser_configuration.py index c892daab..65d994af 100644 --- a/.github/workflows/prepare_releaser_configuration.py +++ b/.github/workflows/prepare_releaser_configuration.py @@ -30,7 +30,7 @@ release_name_template = "helm-{{ .Name }}-{{ .Version }}" pages_branch = "gh-pages-beta" if is_beta else "gh-pages" - version = f"{VERSION}-beta.{SHA}" if is_beta else VERSION + version = f"{VERSION}-beta" if is_beta else VERSION # Create unique release name based on version and commit sha for chart in CHARTS: diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 59eea0f1..df3f9d79 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -41,7 +41,9 @@ build_csi: set -x VERSION=$(cat version.txt) TAGGED=$(grep 'the version of the Vast CSI driver' charts/vastcsi/values.yaml | awk '{print $2}') - if [[ "$TAGGED" != "$VERSION" ]]; then + if [[ "$TAGGED" == *"beta"* ]]; then + echo "skip version verification. Version is beta" + elif [[ "$TAGGED" != "$VERSION" ]]; then echo "version.txt has $VERSION, while our helm chart has $TAGGED (check charts/vastcsi/values.yaml)" exit 5 fi diff --git a/CHANGELOG.md b/CHANGELOG.md index b295198b..76f41783 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # CHANGELOG +## Version 2.4.1 +* Support for multiple Vast Clusters via using StorageClass secrets (VCSI-140) +* Set a timeout on requests to VMS, to prevent worker threads hanging (VCSI-183) +* Improve mounting performance by support the use of VIPPool DNS, skipping an API call to the VMS (VCSI-167) + ## Version 2.4.0 * added Container Object Storage Interface (COSI) support (VCSI-159) * added formal support for multitenancy via StorageClasses (VCSI-147) diff --git a/charts/vastcosi/values.yaml b/charts/vastcosi/values.yaml index d496c265..6d1bbc1a 100644 --- a/charts/vastcosi/values.yaml +++ b/charts/vastcosi/values.yaml @@ -62,7 +62,7 @@ truncateVolumeName: 64 image: csiVastPlugin: repository: vastdataorg/csi - tag: v2.4.0 + tag: v2.4.1-beta-1426549 imagePullPolicy: IfNotPresent objectstorageProvisioner: repository: gcr.io/k8s-staging-sig-storage/objectstorage-sidecar/objectstorage-sidecar diff --git a/charts/vastcsi/templates/controller.yaml b/charts/vastcsi/templates/controller.yaml index bcf9b611..0cd1efa4 100644 --- a/charts/vastcsi/templates/controller.yaml +++ b/charts/vastcsi/templates/controller.yaml @@ -141,11 +141,13 @@ spec: nodeSelector: {{- if .Values.controller.runOnMaster}} node-role.kubernetes.io/master: "" - {{- end}} - {{- if .Values.controller.runOnControlPlane}} + {{- end }} +{{- if .Values.controller.runOnControlPlane}} node-role.kubernetes.io/control-plane: "" - {{- end}} + {{- end }} +{{- if .Values.controller.nodeSelector }} {{ toYaml .Values.controller.nodeSelector | indent 8 }} + {{- end }} priorityClassName: {{ .Values.controller.priorityClassName }} serviceAccount: {{ .Release.Name }}-vast-controller-sa tolerations: diff --git a/charts/vastcsi/templates/shared/_common_env.tpl b/charts/vastcsi/templates/shared/_common_env.tpl index 3ff01af1..4557ac7d 100644 --- a/charts/vastcsi/templates/shared/_common_env.tpl +++ b/charts/vastcsi/templates/shared/_common_env.tpl @@ -4,15 +4,12 @@ # changes in the corresponding template in the other chart. */}} -{{- define "vastcsi.commonEnv" -}} +{{- define "vastcsi.commonEnv" }} -{{- if (urlParse (required "endpoint is required" $.Values.endpoint )).scheme }} - {{- fail "endpoint requires only host to be provided. Please exclude 'http//|https//' from url." -}} -{{- end }} - name: X_CSI_PLUGIN_NAME value: "csi.vastdata.com" - name: X_CSI_VMS_HOST - value: {{ $.Values.endpoint | quote }} + value: {{ $.Values.endpoint | default "" | quote }} - name: X_CSI_ENABLE_VMS_SSL_VERIFICATION value: {{ $.Values.verifySsl | quote }} - name: X_CSI_DELETION_VIP_POOL_NAME diff --git a/charts/vastcsi/templates/shared/_vms_auth.tpl b/charts/vastcsi/templates/shared/_vms_auth.tpl index ac346c4f..4fad2960 100644 --- a/charts/vastcsi/templates/shared/_vms_auth.tpl +++ b/charts/vastcsi/templates/shared/_vms_auth.tpl @@ -4,21 +4,23 @@ {{- define "vastcsi.vmsAuthVolume" -}} {{- if and .Values.sslCert .Values.sslCertsSecretName -}} {{- - fail (printf "Ambiguous origin of the 'sslCert'. The certificate is found in both the '%s' secret and the command line --from-file argument." .Values.secretName) + fail (printf "Ambiguous origin of the 'sslCert'. The certificate is found in both the '%s' secret and the command line --from-file argument." .Values.sslCertsSecretName) -}} {{- end -}} {{- if and .ca_bundle (not .Values.verifySsl) -}} {{- fail "When sslCert is provided `verifySsl` must be set to true." -}} {{- end }} +{{- if $.Values.secretName }} - name: vms-auth secret: - secretName: {{ required "secretName field must be specified" .Values.secretName | quote }} + secretName: {{ $.Values.secretName | quote }} items: - key: username path: username - key: password path: password +{{- end }} {{- if $.ca_bundle }} - name: vms-ca-bundle secret: @@ -32,9 +34,11 @@ {{/* Volume bindings for vms credentials and vms session certificates */}} {{ define "vastcsi.vmsAuthVolumeMount" }} +{{- if $.Values.secretName }} - name: vms-auth mountPath: /opt/vms-auth readOnly: true +{{- end }} {{- if $.ca_bundle }} - name: vms-ca-bundle mountPath: /etc/ssl/certs diff --git a/charts/vastcsi/templates/snapshot-class.yaml b/charts/vastcsi/templates/snapshot-class.yaml index 726d2019..fb573fda 100644 --- a/charts/vastcsi/templates/snapshot-class.yaml +++ b/charts/vastcsi/templates/snapshot-class.yaml @@ -1,17 +1,48 @@ -{{/* Vast CSI snapshot class */}} +{{/* Generate one or more snapshot classes from 'snapshotClasses' section. */}} + +{{/* Check if .Values.secretName is not empty */}} +{{- if not (empty .Values.secretName) }} + +{{/* If .Values.snapshotClasses is empty, set a default value */}} +{{- if empty .Values.snapshotClasses }} +{{- $_ := set .Values "snapshotClasses" (dict "vastdata-snapshot" (dict)) }} +{{- end -}} +{{- end -}} + +{{/* Iterate over SnapshotClasses from manifest */}} +{{- range $name, $options := .Values.snapshotClasses }} + +{{/* Validate setDefaultSnapshotClass option. Options should be either true or false */}} +{{- + $is_default_class := pluck "setDefaultSnapshotClass" $options $.Values.snapshotClassDefaults | first | quote +-}} +{{- if not (or (kindIs "bool" $is_default_class ) ( $is_default_class | mustRegexMatch "true|false" )) -}} + {{- fail "setDefaultSnapshotClass should be either 'true' or 'false'" -}} +{{- end }} + +{{- $snapshot_name_fmt := pluck "snapshotNameFormat" $options $.Values.snapshotClassDefaults | first | quote -}} +{{- $deletion_policy := pluck "deletionPolicy" $options $.Values.snapshotClassDefaults | first | quote -}} + +{{- $snapshot_class_secret := pluck "secretName" $options $.Values.snapshotClassDefaults | first | quote -}} +{{/* Get secretNamespace parameter. If not provided .Release.Namespace is used. */}} +{{- $snapshot_class_secret_namespace := pluck "secretNamespace" $options $.Values.snapshotClassDefaults | first | default $.Release.Namespace | quote -}} -{{- with .Values.snapshotClass }} apiVersion: snapshot.storage.k8s.io/v1 kind: VolumeSnapshotClass metadata: - name: {{ required "snapshotClassName must be non empty string" .snapshotClassName | quote }} + name: {{ required "snapshotClassName must be non empty string" $name }} namespace: {{ include "vastcsi.namespace" $ }} annotations: - snapshot.storage.kubernetes.io/is-default-class: {{ .setDefaultStorageClass | quote }} + snapshot.storage.kubernetes.io/is-default-class: {{ $is_default_class }} labels: {{- include "vastcsi.labels" $ | nindent 4 }} driver: csi.vastdata.com -deletionPolicy: {{ .deletionPolicy | quote }} +deletionPolicy: {{ $deletion_policy }} parameters: - snapshot_name_fmt: {{ .snapshotNameFormat | quote }} + snapshot_name_fmt: {{ $snapshot_name_fmt }} +{{- if ne $snapshot_class_secret ( quote "" ) }} + csi.storage.k8s.io/snapshotter-secret-name: {{ $snapshot_class_secret }} + csi.storage.k8s.io/snapshotter-secret-namespace: {{ $snapshot_class_secret_namespace }} +{{- end }} +--- {{- end }} diff --git a/charts/vastcsi/templates/storage-class.yaml b/charts/vastcsi/templates/storage-class.yaml index 35bf0d0b..854feebf 100644 --- a/charts/vastcsi/templates/storage-class.yaml +++ b/charts/vastcsi/templates/storage-class.yaml @@ -1,7 +1,7 @@ {{/* Generate one or more storage classes from 'storageClasses' section. */}} {{- if not .Values.storageClasses -}} - {{- fail "`storageClasses` cannot be empty section. Specify at least one StorageClass with required parameters (vipPool, storagePath etc)" -}} + {{- fail "`storageClasses` cannot be empty section. Specify at least one StorageClass with required parameters (vipPolicy, storagePath etc)" -}} {{- end -}} {{/* Iterate over StorageClasses from manifest */}} @@ -15,12 +15,6 @@ {{- fail "setDefaultStorageClass should be either 'true' or 'false'" -}} {{- end }} -{{/* Validate lbStrategy parameter. Parameter should be either random or roundrobin. */}} -{{- $lb_strategy := pluck "lbStrategy" $options $.Values.storageClassDefaults | first | quote -}} -{{- if not ( $lb_strategy | mustRegexMatch "roundrobin|random" ) -}} - {{- fail "lbStrategy should be either 'random' or 'roundrobin'" -}} -{{- end }} - {{/* Validate storagePath parameter. Parameter should be not empty string. */}} {{- $storage_path := pluck "storagePath" $options $.Values.storageClassDefaults | first | quote -}} {{- if eq $storage_path ( quote "" ) -}} @@ -34,6 +28,12 @@ {{- end }} {{- $vip_pool_name := pluck "vipPool" $options $.Values.storageClassDefaults | first | quote -}} +{{- $vip_pool_fqdn := pluck "vipPoolFQDN" $options $.Values.storageClassDefaults | first | quote -}} + +{{- if and (ne $vip_pool_name ( quote "" )) (ne $vip_pool_fqdn ( quote "" )) -}} +{{- fail (printf "vipPool and vipPoolFQDN are mutually exclusive in the StorageClass '%s' parameters. Do not set a default value from storageDefaults for either field; choose only one to specify." $name) -}} +{{- end }} + {{- $volume_name_fmt := pluck "volumeNameFormat" $options $.Values.storageClassDefaults | first | quote -}} {{- $eph_volume_name_fmt := pluck "ephemeralVolumeNameFormat" $options $.Values.storageClassDefaults | first | quote -}} {{- $qos_policy := pluck "qosPolicy" $options $.Values.storageClassDefaults | first | quote -}} @@ -44,6 +44,10 @@ first | quote | mustRegexMatch "true" | ternary true false -}} +{{- $storage_class_secret := pluck "secretName" $options $.Values.storageClassDefaults | first | quote -}} +{{/* Get secretNamespace parameter. If not provided .Release.Namespace is used. */}} +{{- $storage_class_secret_namespace := pluck "secretNamespace" $options $.Values.storageClassDefaults | first | default $.Release.Namespace | quote -}} + kind: StorageClass apiVersion: storage.k8s.io/v1 provisioner: csi.vastdata.com @@ -58,15 +62,24 @@ reclaimPolicy: {{ $reclaim_policy }} parameters: root_export: {{ $storage_path }} view_policy: {{ $view_policy }} - lb_strategy: {{ $lb_strategy }} + lb_strategy: "roundrobin" # deprecated; this is here for backwards compatibility, so users don't have to delete their helm deployment and reinstall (since StorageClass is immutable) volume_name_fmt: {{ $volume_name_fmt }} eph_volume_name_fmt: {{ $eph_volume_name_fmt }} -{{- if ne $vip_pool_name ( quote "" ) }} - vip_pool_name: {{ $vip_pool_name }} - {{- end }} -{{- if ne $qos_policy ( quote "" ) }} - qos_policy: {{ $qos_policy }} +{{- range $key, $value := dict "vip_pool_name" $vip_pool_name "vip_pool_fqdn" $vip_pool_fqdn "qos_policy" $qos_policy }} + {{- if and $value (ne $value ( quote "" )) }} + {{ $key }}: {{ $value }} {{- end }} +{{- end }} +{{- if ne $storage_class_secret ( quote "" ) }} + csi.storage.k8s.io/provisioner-secret-name: {{ $storage_class_secret }} + csi.storage.k8s.io/provisioner-secret-namespace: {{ $storage_class_secret_namespace }} + csi.storage.k8s.io/controller-publish-secret-name: {{ $storage_class_secret }} + csi.storage.k8s.io/controller-publish-secret-namespace: {{ $storage_class_secret_namespace }} + csi.storage.k8s.io/node-publish-secret-name: {{ $storage_class_secret }} + csi.storage.k8s.io/node-publish-secret-namespace: {{ $storage_class_secret_namespace }} + csi.storage.k8s.io/controller-expand-secret-name: {{ $storage_class_secret }} + csi.storage.k8s.io/controller-expand-secret-namespace: {{ $storage_class_secret_namespace }} +{{- end }} allowVolumeExpansion: {{ $allow_volume_expansion }} {{- if kindIs "string" $mount_options -}} {{/* Keep option to specify mountOptions as string for backward compatibility */}} diff --git a/charts/vastcsi/values.yaml b/charts/vastcsi/values.yaml index 8ae70867..50578c6f 100644 --- a/charts/vastcsi/values.yaml +++ b/charts/vastcsi/values.yaml @@ -1,12 +1,12 @@ #################### # VAST REST SESSION ATTRIBUTES #################### -# Secret name, which corresponds to a secret containing credentials to login - must be provided by user +# Secret name, which corresponds to a secret containing credentials to login - should be provided by user if secretName is not provided in StorageClass attributes # Secret must contain username and password fields # Example: kubectl create secret generic vast-mgmt --from-literal=username='< VAST username >' --from-literal=password='< VAST password >' secretName: "" -# API endpoint of VAST appliance - must be provided by user +# API endpoint of VAST appliance - should be provided by user if secretName is not provided in StorageClass attributes endpoint: "" # Set true to enable certificate validity test @@ -44,18 +44,36 @@ useLocalIpForMount: "" # storageClassDefaults is set of options that will be using by default if option is not provided # for particular storageClass in 'storageClasses' section storageClassDefaults: + # Any of the following options can be specified within the StorageClasses section on a per-storage class basis, + # or can be set here as default values for all storage classes. + + # Secret name, which corresponds to a secret containing credentials to login - must be provided by user + # Secret must contain username, password and endpoint fields. Other fields are ignored. + # Example: kubectl create secret generic vast-mgmt --from-literal=username='< VAST username >' --from-literal=password='< VAST password >' --from-literal=endpoint='< VAST endpoint >' + # Optionally you can include CA ssl certificate. Along with verifySsl option enabled it will establish trusted connection per StorageClass + # if you have different certificates per cluster (Otherwise use `sslCertsSecretName` to specify global secret with ssl certificate to be used across all storage classes): + # Example: + # kubectl create secret generic vast-mgmt \ + # --from-literal=username='' \ + # --from-literal=password='' \ + # --from-literal=endpoint='' \ + # --from-file=ssl_cert='' + secretName: "" + # Secret namespace. If not specified then secret will be searched in the same namespace as StorageClass is created. + secretNamespace: "" # Where volumes will be located on VAST - must be provided by user storagePath: "" - # Name of VAST VIP pool to use - must be provided by user + # Name of VAST VIP pool to use. Must specify either vipPool or vipPoolFQDN. vipPool: "" + # The FQDN of the VIP pool to use. Must specify either vipPool or vipPoolFQDN. + # Using a DNS skips an API call to the VMS for obtaining a random VIP from the vipPool, leading to faster volume mounting. + # NOTE: The driver will prepend the FQDN with a random prefix, which forces the NFS client to resolve into a different VIP, + # thereby distributing the load across the entire range of the VIP pool. + vipPoolFQDN: "" # VAST policy name to create views - must be provided by user viewPolicy: "" # Allows resizing existing volumes allowVolumeExpansion: true - # Load-balancing strategy - # Options: - # roundrobin, random - lbStrategy: roundrobin # If true, sets Vast CSI as the cluster-wide storage class default setDefaultStorageClass: false # String template for CSI-provisioned volume names, within VAST @@ -83,10 +101,12 @@ storageClasses: {} # User can add more storage classes to this section eg: # vastdata-filesystem2: # vipPool: "vippool-2" +# secretName: "secret2" # .... other options # # vastdata-filesystem3: # vipPool: "vippool-3" +# secretName: "secret3" # .... other options #################### @@ -118,14 +138,39 @@ attachRequired: true #################### # VAST CSI SNAPSHOTS CLASS OPTIONS #################### -snapshotClass: - snapshotClassName: "vastdata-snapshot" - # On snapshot delete behavior. By default, Vast Cluster snapshot will be removed as well. - deletionPolicy: "Delete" - # If true, sets Vast CSI as the cluster-wide snapshot class default - setDefaultStorageClass: true +# snapshotClassDefaults is set of options that will be using by default if option is not provided +# for particular snapshotClass in 'snapshotClasses' section +snapshotClassDefaults: + # Any of the following options can be specified within the snapshotClasses section on a per-snapshot class basis, + # or can be set here as default values for all snapshot classes. + + # Secret name, which corresponds to a secret containing credentials to login - must be provided by user + # Secret must contain username, password and endpoint fields. Other fields are ignored. + # Example: kubectl create secret generic vast-mgmt --from-literal=username='< VAST username >' --from-literal=password='< VAST password >' --from-literal=endpoint='< VAST endpoint >' + # Optionally you can include CA ssl certificate. Along with verifySsl option enabled it will establish trusted connection per SnapshotClass + # if you have different certificates per cluster (Otherwise use `sslCertsSecretName` to specify global secret with ssl certificate to be used across all snapshot classes): + # Example: + # kubectl create secret generic vast-mgmt \ + # --from-literal=username='' \ + # --from-literal=password='' \ + # --from-literal=endpoint='' \ + # --from-file=ssl_cert='' + secretName: "" + # Secret namespace. If not specified then secret will be searched in the same namespace as SnapshotClass is created. + secretNamespace: "" + # If true, sets SnapshotClass as the cluster-wide snapshot class default + setDefaultSnapshotClass: true # String template for CSI-provisioned snapshot names, within VAST snapshotNameFormat: "csi:{namespace}:{name}:{id}" + # On snapshot delete behavior. By default, Vast Cluster snapshot will be removed as well. + deletionPolicy: "Delete" + +snapshotClasses: {} +# vastdata-snapshot: +# secretName: "secret" +# deletionPolicy: "Delete" +# setDefaultSnapshotClass: false +# snapshotNameFormat: "snapshot:{name}:{id}" #################### @@ -135,7 +180,7 @@ snapshotClass: image: csiVastPlugin: repository: vastdataorg/csi - tag: v2.4.0 # the version of the Vast CSI driver + tag: v2.4.1-beta-1426549 # the version of the Vast CSI driver imagePullPolicy: IfNotPresent csiAttacher: repository: registry.k8s.io/sig-storage/csi-attacher diff --git a/packaging/files/constraints.txt b/packaging/files/constraints.txt index 50ddec17..da94dd83 100644 --- a/packaging/files/constraints.txt +++ b/packaging/files/constraints.txt @@ -19,4 +19,5 @@ real-easypy==0.4.3 psutil==5.6.3 prompt_toolkit==3.0.8 requests==2.28.2 +cryptography==43.0.0 pytest==7.2.0 diff --git a/packaging/files/requirements.txt b/packaging/files/requirements.txt index 297ec456..a714e438 100644 --- a/packaging/files/requirements.txt +++ b/packaging/files/requirements.txt @@ -6,6 +6,7 @@ plumbum real-easypy requests psutil +cryptography prompt_toolkit pytest diff --git a/tests/conftest.py b/tests/conftest.py index 9a2e3711..a7ae5d06 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,7 +4,7 @@ from tempfile import gettempdir from contextlib import contextmanager from typing import List, Optional, Any -from unittest.mock import patch, MagicMock +from unittest.mock import MagicMock, patch import pytest from plumbum import local @@ -112,6 +112,16 @@ def __init__(self, # Fixtures # ---------------------------------------------------------------------------------------------------------------------- +@pytest.fixture +def vms_session(monkeypatch, tmpdir): + from vast_csi.server import get_vms_session + from vast_csi.configuration import Config + tmpdir.join("username").write("test") + tmpdir.join("password").write("test") + monkeypatch.setattr(Config, "vms_credentials_store", local.path(tmpdir)) + with patch("vast_csi.vms_session.VmsSession.refresh_auth_token", MagicMock()): + yield get_vms_session() + @pytest.fixture def volume_capabilities(): @@ -144,7 +154,6 @@ def __wrapped( view: Optional[str] = Bunch(path="/test/view", id=1, tenant_id=1) ): session_mock = FakeSession(view=view, quota_id=quota_id, quota_hard_limit=quota_hard_limit) - with patch("vast_csi.server.CsiController.vms_session", session_mock): - yield session_mock + yield session_mock yield __wrapped diff --git a/tests/test_controller.py b/tests/test_controller.py index 7342627f..163012a6 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -22,7 +22,7 @@ def test_create_volume_invalid_capability(self, volume_capabilities, fs_type, mo # Execution with pytest.raises(Abort) as ex_context: - cont.CreateVolume("test_volume", capabilities) + cont.CreateVolume(None,"test_volume", capabilities) # Assertion err = ex_context.value @@ -41,26 +41,23 @@ def test_validate_parameters(self, volume_capabilities, parameters, err_message) # Execution with pytest.raises(MissingParameter) as ex_context: - cont.CreateVolume(name="test_volume", volume_capabilities=capabilities, parameters=parameters) + cont.CreateVolume(None, name="test_volume", volume_capabilities=capabilities, parameters=parameters) # Assertion err = ex_context.value assert err_message in err.message assert err.code == grpc.StatusCode.INVALID_ARGUMENT - @patch("vast_csi.configuration.Config.vms_user", PropertyMock("test")) - @patch("vast_csi.configuration.Config.vms_password", PropertyMock("test")) - @patch("vast_csi.vms_session.VmsSession.refresh_auth_token", MagicMock()) - def test_local_ip_for_mount(self, volume_capabilities, monkeypatch): + def test_local_ip_for_mount(self, volume_capabilities, vms_session, monkeypatch): # Preparation cont = CsiController() - monkeypatch.setattr(cont.vms_session.config, "use_local_ip_for_mount", "test.com") + monkeypatch.setattr(vms_session.config, "use_local_ip_for_mount", "test.com") data = dict(root_export="/k8s", view_policy="default") capabilities = volume_capabilities(fs_type="ext4", mount_flags="", mode=types.AccessModeType.SINGLE_NODE_WRITER) # Execution with pytest.raises(Abort) as ex_context: - cont.CreateVolume(name="test_volume", volume_capabilities=capabilities, parameters=data) + cont.CreateVolume(vms_session=vms_session, name="test_volume", volume_capabilities=capabilities, parameters=data) # Assertion err = ex_context.value @@ -68,13 +65,13 @@ def test_local_ip_for_mount(self, volume_capabilities, monkeypatch): assert err.code == grpc.StatusCode.INVALID_ARGUMENT # Execution - monkeypatch.setattr( cont.vms_session.config, "use_local_ip_for_mount", "") + monkeypatch.setattr(vms_session.config, "use_local_ip_for_mount", "") with pytest.raises(Abort) as ex_context: - cont.CreateVolume(name="test_volume", volume_capabilities=capabilities, parameters=data) + cont.CreateVolume(vms_session=vms_session, name="test_volume", volume_capabilities=capabilities, parameters=data) # Assertion err = ex_context.value - assert "vip_pool_name or use_local_ip_for_mount must be provided" in err.message + assert "either vip_pool_name, vip_pool_fqdn or use_local_ip_for_mount" in err.message assert err.code == grpc.StatusCode.INVALID_ARGUMENT def test_quota_hard_limit_not_match(self, volume_capabilities, fake_session: "FakeSession"): @@ -87,7 +84,7 @@ def test_quota_hard_limit_not_match(self, volume_capabilities, fake_session: "Fa # Execution with pytest.raises(Abort) as ex_context: with fake_session(quota_hard_limit=999) as session: - cont.CreateVolume(name="test_volume", volume_capabilities=capabilities, parameters=parameters) + cont.CreateVolume(vms_session=session, name="test_volume", volume_capabilities=capabilities, parameters=parameters) # Assertion err = ex_context.value @@ -110,20 +107,17 @@ def test_parse_mount_options(self, raw_mount_options): mount_options = ",".join(re.sub(r"[\[\]]", "", raw_mount_options).replace(",", " ").split()) assert mount_options == "vers=4,nolock,proto=tcp,nconnect=4" - @patch("vast_csi.configuration.Config.vms_user", PropertyMock("test")) - @patch("vast_csi.configuration.Config.vms_password", PropertyMock("test")) - @patch("vast_csi.vms_session.VmsSession.refresh_auth_token", MagicMock()) @patch("vast_csi.vms_session.VmsSession.get_quota", MagicMock(return_value=Bunch(tenant_id=1))) @patch("vast_csi.vms_session.VmsSession.get_vip", MagicMock(return_value="2.2.2.2")) @pytest.mark.parametrize("local_ip", ["1.1.1.1", "::1", "2001:0db8:85a3:0000:0000:8a2e:0370:7334"]) @pytest.mark.parametrize("vip_pool_name", ["", "test-vip"]) - def test_publish_volume_with_local_ip(self, volume_capabilities, monkeypatch, local_ip, vip_pool_name): + def test_publish_volume_with_local_ip(self, vms_session, volume_capabilities, monkeypatch, local_ip, vip_pool_name): """ Test if use_local_ip_for_mount is set, it will use local IP for mount (even when vip_pool_name is provided) """ # Preparation cont = CsiController() - conf = cont.vms_session.config + conf = vms_session.config node_id = "test-node" volume_id = "test-volume" monkeypatch.setattr(conf, "use_local_ip_for_mount", local_ip), @@ -132,7 +126,7 @@ def test_publish_volume_with_local_ip(self, volume_capabilities, monkeypatch, lo # Execution resp = cont.ControllerPublishVolume( - node_id=node_id, volume_id=volume_id, volume_capability=capabilities[0], volume_context=volume_context + vms_session=vms_session, node_id=node_id, volume_id=volume_id, volume_capability=capabilities[0], volume_context=volume_context ) publish_context = resp.publish_context diff --git a/tests/test_cosi_privisioner.py b/tests/test_cosi_privisioner.py index 70a9898d..63206263 100644 --- a/tests/test_cosi_privisioner.py +++ b/tests/test_cosi_privisioner.py @@ -1,8 +1,8 @@ import pytest import grpc from easypy.bunch import Bunch -from unittest.mock import patch, PropertyMock, MagicMock -from vast_csi.server import CosiProvisioner, Abort, MissingParameter +from unittest.mock import patch, MagicMock +from vast_csi.server import CosiProvisioner, MissingParameter COMMON_PARAMS = dict( @@ -22,9 +22,6 @@ ) -@patch("vast_csi.configuration.Config.vms_user", PropertyMock("test")) -@patch("vast_csi.configuration.Config.vms_password", PropertyMock("test")) -@patch("vast_csi.vms_session.VmsSession.refresh_auth_token", MagicMock()) @patch("vast_csi.vms_session.VmsSession.get_vip", MagicMock(return_value="172.0.0.1")) @patch("vast_csi.vms_session.VmsSession.get_view", MagicMock(return_value=None)) @patch( @@ -36,13 +33,13 @@ MagicMock(return_value=Bunch(id=1, tenant_id=1)), ) class TestCosiProvisionerSuite: - def _create_bucket(self, name, parameters): + def _create_bucket(self, name, parameters, vms_session): cosi = CosiProvisioner() - return cosi.DriverCreateBucket(name=name, parameters=parameters) + return cosi.DriverCreateBucket(name=name, parameters=parameters, vms_session=vms_session) @patch("vast_csi.vms_session.VmsSession.ensure_user") @patch("vast_csi.vms_session.VmsSession.create_view") - def test_create_bucket(self, m_create_view, m_ensure_user): + def test_create_bucket(self, m_create_view, m_ensure_user, vms_session): """Test successful bucket creation""" # Preparation cosi = CosiProvisioner() @@ -51,7 +48,7 @@ def test_create_bucket(self, m_create_view, m_ensure_user): # Execution params = COMMON_PARAMS.copy() - res = self._create_bucket(name=bucket_name, parameters=params) + res = self._create_bucket(name=bucket_name, parameters=params, vms_session=vms_session) # Assertion assert res.bucket_id == "test-bucket@1@http://172.0.0.1:80" @@ -86,7 +83,7 @@ def test_create_bucket(self, m_create_view, m_ensure_user): @pytest.mark.parametrize("root_export", ["", "/"]) @patch("vast_csi.vms_session.VmsSession.ensure_user", MagicMock()) @patch("vast_csi.vms_session.VmsSession.create_view") - def test_create_bucket_with_root_storage_path(self, m_create_view, root_export): + def test_create_bucket_with_root_storage_path(self, m_create_view, root_export, vms_session): """Test successful bucket creation with root storage path""" # Preparation common_params = COMMON_PARAMS.copy() @@ -94,7 +91,7 @@ def test_create_bucket_with_root_storage_path(self, m_create_view, root_export): bucket_name = "test-bucket" # Execution - res = self._create_bucket(name=bucket_name, parameters=common_params) + res = self._create_bucket(name=bucket_name, parameters=common_params, vms_session=vms_session) # Assertion create_view_kwargs = m_create_view.call_args.kwargs @@ -102,12 +99,12 @@ def test_create_bucket_with_root_storage_path(self, m_create_view, root_export): @patch("vast_csi.vms_session.VmsSession.ensure_user", MagicMock()) @patch("vast_csi.vms_session.VmsSession.create_view") - def test_create_bucket_only_required_params(self, m_create_view): + def test_create_bucket_only_required_params(self, m_create_view, vms_session): params = dict(root_export="/buckets", vip_pool_name="vippool-1") bucket_name = "test-bucket" # Execution - self._create_bucket(name=bucket_name, parameters=params) + self._create_bucket(name=bucket_name, parameters=params, vms_session=vms_session) # Assertion assert m_create_view.call_args.kwargs == { @@ -120,7 +117,7 @@ def test_create_bucket_only_required_params(self, m_create_view): } @pytest.mark.parametrize("missing_param", ["root_export", "vip_pool_name"]) - def test_create_bucket_missing_required_params(self, missing_param): + def test_create_bucket_missing_required_params(self, missing_param, vms_session): """Test missing required parameters""" # Preparation params = COMMON_PARAMS.copy() @@ -129,7 +126,7 @@ def test_create_bucket_missing_required_params(self, missing_param): # Execution with pytest.raises(MissingParameter) as ex_context: - self._create_bucket(name=bucket_name, parameters=params) + self._create_bucket(name=bucket_name, parameters=params, vms_session=vms_session) # Assertion err = ex_context.value diff --git a/tests/test_vms_session.py b/tests/test_vms_session.py index 9aa57e31..d7e961da 100644 --- a/tests/test_vms_session.py +++ b/tests/test_vms_session.py @@ -15,10 +15,9 @@ class TestVmsSessionSuite: @pytest.mark.parametrize("cluster_version", [ "4.3.9", "4.0.11.12", "3.4.6.123.1", "4.5.6-1", "4.6.0", "4.6.0-1", "4.6.0-1.1", "4.6.9" ]) - def test_requisite_decorator(self, cluster_version): + def test_requisite_decorator(self, cluster_version, vms_session): """Test `requisite` decorator produces exception when cluster version doesn't met requirements""" # Preparation - cont = CsiController() fake_mgmt = PropertyMock(return_value=SemVer.loads_fuzzy(cluster_version)) stripped_version = SemVer.loads_fuzzy(cluster_version).dumps() @@ -33,33 +32,31 @@ def raise_http_err(*args, **kwargs): # Execution with patch("vast_csi.vms_session.VmsSession.sw_version", fake_mgmt): with pytest.raises(OperationNotSupported) as exc: - cont.vms_session.delete_folder("/abc", 1) + vms_session.delete_folder("/abc", 1) # Assertion assert f"Cluster does not support this operation - 'delete_folder'" \ f" (needs 4.7-0, got {stripped_version})\n current_version = {stripped_version}\n" \ f" op = delete_folder\n required_version = 4.7-0" in exc.value.render(color=False) - def test_trash_api_disabled_helm_config(self): + def test_trash_api_disabled_helm_config(self, vms_session): """Test trash api disable in helm chart cause Exception""" # Preparation - cont = CsiController() - cont.vms_session.config.dont_use_trash_api = True + vms_session.config.dont_use_trash_api = True fake_mgmt = PropertyMock(return_value=SemVer.loads_fuzzy("4.7.0")) # Execution with patch("vast_csi.vms_session.VmsSession.sw_version", fake_mgmt): with pytest.raises(OperationNotSupported) as exc: - cont.vms_session.delete_folder("/abc", 1) + vms_session.delete_folder("/abc", 1) # Assertion assert "Cannot delete folder via VMS: Disabled by Vast CSI settings" in exc.value.render(color=False) - def test_trash_api_disabled_cluster_settings(self): + def test_trash_api_disabled_cluster_settings(self, vms_session): """Test trash api disable on cluster cause Exception""" # Preparation - cont = CsiController() - cont.vms_session.config.dont_use_trash_api = True + vms_session.config.dont_use_trash_api = True fake_mgmt = PropertyMock(return_value=SemVer.loads_fuzzy("5.0.0.25")) def raise_http_err(*args, **kwargs): @@ -74,32 +71,32 @@ def raise_http_err(*args, **kwargs): patch("vast_csi.vms_session.VmsSession.delete", side_effect=raise_http_err), ): with pytest.raises(OperationNotSupported) as exc: - cont.vms_session.delete_folder("/abc", 1) + vms_session.delete_folder("/abc", 1) # Assertion assert "Cannot delete folder via VMS: Disabled by Vast CSI settings" in exc.value.render(color=False) - def test_delete_folder_local_mounting_requires_configuration(self): + def test_delete_folder_local_mounting_requires_configuration(self, vms_session): """Test deleting the folder via local mounting requires deletionVipPool and deletionVipPolicy to be provided.""" # Preparation cont = CsiController() - cont.vms_session.config.dont_use_trash_api = True + vms_session.config.dont_use_trash_api = True fake_mgmt = PropertyMock(return_value=SemVer.loads_fuzzy("4.6.0")) # Execution with patch("vast_csi.vms_session.VmsSession.sw_version", fake_mgmt): with pytest.raises(AssertionError) as exc: - cont._delete_data_from_storage("/abc", 1) + cont._delete_data_from_storage(vms_session, "/abc", 1) # Assertion assert "Ensure that deletionViewPolicy is properly configured" in str(exc.value) - def test_delete_folder_unsuccesful_attempt_cache_result(self): + def test_delete_folder_unsuccesful_attempt_cache_result(self, vms_session): """Test if Trash API has been failed it wont be executed second time.""" # Preparation cont = CsiController() - cont.vms_session.config.dont_use_trash_api = False - cont.vms_session.config.avoid_trash_api.reset(-1) + vms_session.config.dont_use_trash_api = False + vms_session.config.avoid_trash_api.reset(-1) fake_mgmt = PropertyMock(return_value=SemVer.loads_fuzzy("4.7.0")) # Execution @@ -109,29 +106,29 @@ def raise_http_err(*args, **kwargs): resp.raw = BytesIO(b"trash folder disabled") raise ApiError(response=resp) - assert cont.vms_session.config.avoid_trash_api.expired + assert vms_session.config.avoid_trash_api.expired # Execution with ( patch("vast_csi.vms_session.VmsSession.sw_version", fake_mgmt), patch("vast_csi.vms_session.VmsSession.delete", side_effect=raise_http_err) as mocked_request, ): with pytest.raises(AssertionError) as exc: - cont._delete_data_from_storage("/abc", 1) + cont._delete_data_from_storage(vms_session, "/abc", 1) assert mocked_request.call_count == 1 - assert not cont.vms_session.config.avoid_trash_api.expired + assert not vms_session.config.avoid_trash_api.expired with pytest.raises(AssertionError) as exc: - cont._delete_data_from_storage("/abc", 1) + cont._delete_data_from_storage(vms_session,"/abc", 1) assert mocked_request.call_count == 1 - assert not cont.vms_session.config.avoid_trash_api.expired + assert not vms_session.config.avoid_trash_api.expired # reset timer. trash API should be executed again - cont.vms_session.config.avoid_trash_api.reset(-1) + vms_session.config.avoid_trash_api.reset(-1) with pytest.raises(AssertionError) as exc: - cont._delete_data_from_storage("/abc", 1) + cont._delete_data_from_storage(vms_session,"/abc", 1) assert mocked_request.call_count == 2 - assert not cont.vms_session.config.avoid_trash_api.expired + assert not vms_session.config.avoid_trash_api.expired diff --git a/vast_csi/configuration.py b/vast_csi/configuration.py index 642132ed..bfc44ec2 100644 --- a/vast_csi/configuration.py +++ b/vast_csi/configuration.py @@ -10,6 +10,7 @@ NODE, COSI_PLUGIN ) +from .exceptions import LookupFieldError from easypy.caching import cached_property from easypy.timing import Timer @@ -41,7 +42,6 @@ class Path(TypedEnv.Str): vms_host = TypedEnv.Str("X_CSI_VMS_HOST", default="vast") ssl_verify = TypedEnv.Bool("X_CSI_ENABLE_VMS_SSL_VERIFICATION", default=False) - load_balancing = TypedEnv.Str("X_CSI_LB_STRATEGY", default="roundrobin") truncate_volume_name = TypedEnv.Int("X_CSI_TRUNCATE_VOLUME_NAME", default=None) worker_threads = TypedEnv.Int("X_CSI_WORKER_THREADS", default=10) dont_use_trash_api = TypedEnv.Bool("X_CSI_DONT_USE_TRASH_API", default=False) @@ -60,10 +60,20 @@ class Path(TypedEnv.Str): @cached_property def vms_user(self): + if not self.vms_credentials_store['username'].exists(): + raise LookupFieldError( + field="username", + tip="Make sure username is present in global VMS credentials secret" + ) return self.vms_credentials_store['username'].read().strip() @cached_property def vms_password(self): + if not self.vms_credentials_store['password'].exists(): + raise LookupFieldError( + field="password", + tip="Make sure password is present in global VMS credentials secret" + ) return self.vms_credentials_store['password'].read().strip() @property diff --git a/vast_csi/exceptions.py b/vast_csi/exceptions.py index 0fcc4937..b921b282 100644 --- a/vast_csi/exceptions.py +++ b/vast_csi/exceptions.py @@ -20,6 +20,10 @@ class OperationNotSupported(TException): template = "Cluster does not support this operation - {op!r} (needs {required_version}, got {current_version})" +class LookupFieldError(TException): + template = "Could not find {field}." + + class MissingParameter(Abort): def __init__(self, param: str): self.param = param diff --git a/vast_csi/server.py b/vast_csi/server.py index dbc559a7..39aeb3a8 100644 --- a/vast_csi/server.py +++ b/vast_csi/server.py @@ -35,13 +35,13 @@ from easypy.caching import cached_property from easypy.bunch import Bunch from easypy.exceptions import TException +from easypy.collections import separate from .logging import logger, init_logging from .utils import ( patch_traceback_format, get_mount, normalize_mount_options, - parse_load_balancing_strategy, string_to_proto_timestamp, is_valid_ip, ) @@ -56,9 +56,10 @@ MountFailed, VolumeAlreadyExists, SourceNotFound, - OperationNotSupported + OperationNotSupported, + LookupFieldError, ) -from .vms_session import VmsSession, TestVmsSession +from .vms_session import get_vms_session, VmsSession from .configuration import Config @@ -120,18 +121,6 @@ def _validate_capabilities(capabilities): f"Unsupported file system type: {capability.mount.fs_type}", ) - -class SessionMixin: - - @cached_property - def vms_session(self): - session_class = TestVmsSession if CONF.mock_vast else VmsSession - session = session_class(config=CONF) - logger.info("VMS ssl verification {}.".format("enabled" if CONF.ssl_verify else "disabled")) - session.refresh_auth_token() - return session - - class Instrumented: SILENCED = ["Probe", "NodeGetCapabilities"] @@ -143,9 +132,10 @@ def logged(cls, func): log = logger.debug if (method in cls.SILENCED) else logger.info parameters = inspect.signature(func).parameters - required_params = { - name for name, p in parameters.items() if p.default is p.empty - } + required_params, non_required_params = map( + set, separate(parameters, key=lambda k: parameters[k].default is inspect._empty) + ) + vms_session_args = inspect.signature(get_vms_session).parameters.keys() required_params.discard("self") func = kwargs_resilient(func) @@ -154,17 +144,30 @@ def logged(cls, func): def wrapper(self, request, context): peer = context.peer() params = {fld.name: value for fld, value in request.ListFields()} - missing = required_params - {"request", "context"} - set(params) + # secrets are not logged and not the part of function signature. + secrets = params.pop("secrets", {}) + missing_params = required_params - {"request", "context", "vms_session"} - set(params) log(f"{peer} >>> {method}:") if params: for line in pformat(params).splitlines(): - log(f" {line}") + log(f"({method}) {line}") + + if "vms_session" in required_params: + # If secret exist and method signature requires `vms_session` + # then `vms_session` with secret will be injected into function parameters + params["vms_session"] = get_vms_session(**{k: secrets.get(k) for k in vms_session_args}) + elif "vms_session" in non_required_params: + # Try to take vms_session from secret. Set None on error. + try: + params["vms_session"] = get_vms_session(**{k: secrets.get(k) for k in vms_session_args}) + except LookupFieldError: + params["vms_session"] = None try: - if missing: - msg = f'Missing required fields: {", ".join(sorted(missing))}' + if missing_params: + msg = f'Missing required fields: {", ".join(sorted(missing_params))}' logger.error(f"{peer} <<< {method}: {msg}") raise Abort(INVALID_ARGUMENT, msg) @@ -240,18 +243,8 @@ def GetPluginCapabilities(self, request, context): ) def Probe(self, request, context): - if self.node: - return types.ProbeRespOK - elif CONF.mock_vast: - return types.ProbeRespOK - elif self.controller: - try: - self.controller.vms_session.versions(status="success", log_result=False) - except ApiError as exc: - raise Abort(FAILED_PRECONDITION, str(exc)) - return types.ProbeRespOK - else: - return types.ProbeRespNotReady + return types.ProbeRespOK + ################################################################ @@ -261,7 +254,7 @@ def Probe(self, request, context): ################################################################ -class CsiController(csi_grpc.ControllerServicer, Instrumented, SessionMixin): +class CsiController(csi_grpc.ControllerServicer, Instrumented): CAPABILITIES = [ types.CtrlCapabilityType.CREATE_DELETE_VOLUME, @@ -285,13 +278,14 @@ def ControllerGetCapabilities(self): def ValidateVolumeCapabilities( self, + vms_session, context, volume_id, volume_capabilities, volume_context=None, parameters=None, ): - if not self.vms_session.get_quota(volume_id): + if not vms_session.get_quota(volume_id): raise Abort(NOT_FOUND, f"Volume {volume_id} does not exist") try: _validate_capabilities(volume_capabilities) @@ -308,6 +302,7 @@ def ValidateVolumeCapabilities( def CreateVolume( self, + vms_session, name, volume_capabilities, capacity_range=None, @@ -333,7 +328,7 @@ def CreateVolume( ) # Take appropriate builder for volume, snapshot or test builder if CONF.mock_vast: - root_export = volume_name_fmt = lb_strategy = view_policy = vip_pool_name = mount_options = qos_policy = "" + root_export = volume_name_fmt = view_policy = vip_pool_name = vip_pool_fqdn = mount_options = qos_policy = "" builder = TestVolumeBuilder else: @@ -341,13 +336,23 @@ def CreateVolume( raise MissingParameter(param="root_export") if not (view_policy := parameters.get("view_policy")): raise MissingParameter(param="view_policy") - if not (vip_pool_name := parameters.get("vip_pool_name", "")): + + vip_pool_fqdn = parameters.get("vip_pool_fqdn") + vip_pool_name = parameters.get("vip_pool_name") + if vip_pool_name and vip_pool_fqdn: + raise Abort( + INVALID_ARGUMENT, + "vip_pool_name and vip_pool_fqdn are mutually exclusive. Provide one of them." + ) + elif not (vip_pool_name or vip_pool_fqdn): if not CONF.use_local_ip_for_mount: - raise Abort(INVALID_ARGUMENT, "vip_pool_name or use_local_ip_for_mount must be provided") + raise Abort( + INVALID_ARGUMENT, + "either vip_pool_name, vip_pool_fqdn or use_local_ip_for_mount must be provided." + ) elif not is_valid_ip(CONF.use_local_ip_for_mount): raise Abort(INVALID_ARGUMENT, f"Local IP address: {CONF.use_local_ip_for_mount} is invalid") volume_name_fmt = parameters.get("volume_name_fmt", CONF.name_fmt) - lb_strategy = parameters.get("lb_strategy", CONF.load_balancing) qos_policy = parameters.get("qos_policy") if not volume_content_source: @@ -368,7 +373,7 @@ def CreateVolume( # Create volume, volume from snapshot or mount local path (for testing purposes) # depends on chosen builder. builder = builder( - controller=self, + vms_session=vms_session, configuration=CONF, name=name, rw_access_mode=rw_access_mode, @@ -381,8 +386,8 @@ def CreateVolume( volume_name_fmt=volume_name_fmt, view_policy=view_policy, vip_pool_name=vip_pool_name, + vip_pool_fqdn=vip_pool_fqdn, mount_options=mount_options, - lb_strategy=lb_strategy, qos_policy=qos_policy, ) try: @@ -393,11 +398,11 @@ def CreateVolume( raise Abort(ALREADY_EXISTS, exc.message) return types.CreateResp(volume=volume) - def _delete_data_from_storage(self, path, tenant_id): + def _delete_data_from_storage(self, vms_session, path, tenant_id): if CONF.avoid_trash_api.expired: try: logger.info(f"Attempting trash API to delete {path}") - self.vms_session.delete_folder(path, tenant_id) + vms_session.delete_folder(path, tenant_id) return # Successfully deleted. Prevent using local mounting except OperationNotSupported as exc: logger.debug(f"Trash API not available {exc}") @@ -410,7 +415,7 @@ def _delete_data_from_storage(self, path, tenant_id): "Ensure that deletionViewPolicy is properly " "configured in your Helm configuration to perform local volume deletion." ) - view_policy = self.vms_session.get_view_policy(policy_name=CONF.deletion_view_policy) + view_policy = vms_session.get_view_policy(policy_name=CONF.deletion_view_policy) assert tenant_id == view_policy.tenant_id, ( f"Volume and deletionViewPolicy must be in the same tenant. " f"Make sure deletionViewPolicy belongs to tenant {tenant_id} or use Trash API for deletion." @@ -422,10 +427,10 @@ def _delete_data_from_storage(self, path, tenant_id): "Ensure that deletionVipPool is properly " "configured in your Helm configuration to perform local volume deletion." ) - nfs_server_ip = self.vms_session.get_vip(CONF.deletion_vip_pool, view_policy.tenant_id) + nfs_server_ip = vms_session.get_vip(CONF.deletion_vip_pool, view_policy.tenant_id) logger.info(f"Creating temporary base view.") - with self.vms_session.temp_view(path.dirname, view_policy.id, view_policy.tenant_id) as base_view: + with vms_session.temp_view(path.dirname, view_policy.id, view_policy.tenant_id) as base_view: mount_spec = f"{nfs_server_ip}:{base_view.alias}" mounted = False tmpdir = local.path(mkdtemp()) # convert string to local.path @@ -470,36 +475,36 @@ def _delete_data_from_storage(self, path, tenant_id): os.remove(tmpdir['.csi-unmounted']) # will fail if still mounted somehow os.rmdir(tmpdir) # will fail if not empty directory - def DeleteVolume(self, volume_id): - self.vms_session.ensure_snapshot_stream_deleted(f"strm-{volume_id}") - if quota := self.vms_session.get_quota(volume_id): + def DeleteVolume(self, vms_session, volume_id): + vms_session.ensure_snapshot_stream_deleted(f"strm-{volume_id}") + if quota := vms_session.get_quota(volume_id): # this is a check we have to do until Vast provides access to orphaned snapshots (ORION-135599) might_use_trash_folder = not CONF.dont_use_trash_api - if might_use_trash_folder and self.vms_session.has_snapshots(quota.path): + if might_use_trash_folder and vms_session.has_snapshots(quota.path): raise Exception(f"Unable to delete {volume_id} as it holds snapshots") try: - self._delete_data_from_storage(quota.path, quota.tenant_id) + self._delete_data_from_storage(vms_session, quota.path, quota.tenant_id) except OSError as exc: if 'not empty' not in str(exc): raise - if snaps := self.vms_session.has_snapshots(quota.path): + if snaps := vms_session.has_snapshots(quota.path): # this is expected when the volume has snapshots logger.info(f"{quota.path} will remain due to remaining {len(snaps)} snapshots") else: raise logger.info(f"Data removed: {quota.path}") - self.vms_session.delete_view_by_path(quota.path) + vms_session.delete_view_by_path(quota.path) logger.info(f"View removed: {quota.path}") - self.vms_session.delete_quota(quota.id) + vms_session.delete_quota(quota.id) logger.info(f"Quota removed: {quota.id}") logger.info(f"Removed volume: {volume_id}") return types.DeleteResp() def ControllerPublishVolume( - self, node_id, volume_id, volume_capability, volume_context=None + self, vms_session, node_id, volume_id, volume_capability, volume_context=None ): volume_context = volume_context or dict() _validate_capabilities([volume_capability]) @@ -508,17 +513,12 @@ def ControllerPublishVolume( root_export = CONF.sanity_test_nfs_export else: root_export = local.path(volume_context["root_export"]) - - load_balancing = parse_load_balancing_strategy(volume_context.get("load_balancing", CONF.load_balancing)) - # Build export path for snapshot or volume if snapshot_base_path := volume_context.get("snapshot_base_path"): # Snapshot - quota_path_fragment = snapshot_base_path.split("/")[0] export_path = str(root_export[snapshot_base_path]) else: # Volume - quota_path_fragment = volume_id export_path = str(root_export[volume_id]) if CONF.csi_sanity_test and CONF.node_id != node_id: @@ -526,16 +526,14 @@ def ControllerPublishVolume( raise Abort(NOT_FOUND, f"Unknown volume: {node_id}") vip_pool_name = volume_context.get("vip_pool_name") - if vip_pool_name or CONF.mock_vast: - if not (quota := self.vms_session.get_quota(quota_path_fragment)): - raise Abort(NOT_FOUND, f"Unknown volume: {quota_path_fragment}") - nfs_server_ip = self.vms_session.get_vip( - vip_pool_name=vip_pool_name, - load_balancing=load_balancing, - tenant_id=quota.tenant_id, - ) + vip_pool_fqdn = volume_context.get("vip_pool_fqdn") + if vip_pool_fqdn: + nfs_server_ip = vip_pool_fqdn + elif vip_pool_name or CONF.mock_vast: + nfs_server_ip = vms_session.get_vip(vip_pool_name=vip_pool_name) else: nfs_server_ip = CONF.use_local_ip_for_mount + assert nfs_server_ip, f"{nfs_server_ip=}" logger.info(f"Using local IP for mount: {nfs_server_ip}") return types.CtrlPublishResp( @@ -548,10 +546,10 @@ def ControllerPublishVolume( def ControllerUnpublishVolume(self, node_id, volume_id): return types.CtrlUnpublishResp() - def ControllerExpandVolume(self, volume_id, capacity_range): + def ControllerExpandVolume(self, vms_session, volume_id, capacity_range): requested_capacity = capacity_range.required_bytes - if not (quota := self.vms_session.get_quota(volume_id)): + if not (quota := vms_session.get_quota(volume_id)): raise Abort(NOT_FOUND, f"Not found quota with id: {volume_id}") existing_capacity = quota.hard_limit @@ -559,7 +557,7 @@ def ControllerExpandVolume(self, volume_id, capacity_range): capacity_bytes = existing_capacity else: try: - self.vms_session.update_quota( + vms_session.update_quota( quota_id=quota.id, data=dict(hard_limit=requested_capacity) ) except ApiError as exc: @@ -571,11 +569,11 @@ def ControllerExpandVolume(self, volume_id, capacity_range): node_expansion_required=False, ) - def CreateSnapshot(self, source_volume_id, name, parameters=None): + def CreateSnapshot(self, vms_session, source_volume_id, name, parameters=None): parameters = parameters or dict() volume_id = source_volume_id - if not (quota := self.vms_session.get_quota(volume_id)): + if not (quota := vms_session.get_quota(volume_id)): raise Abort(NOT_FOUND, f"Unknown volume: {volume_id}") if CONF.mock_vast: @@ -614,7 +612,7 @@ def CreateSnapshot(self, source_volume_id, name, parameters=None): ) snapshot_name = snapshot_name.replace(":", "-").replace("/", "-") try: - snap = self.vms_session.ensure_snapshot(snapshot_name=snapshot_name, path=path, tenant_id=tenant_id) + snap = vms_session.ensure_snapshot(snapshot_name=snapshot_name, path=path, tenant_id=tenant_id) except ApiError as exc: handled = False if exc.response.status_code == 400: @@ -624,7 +622,7 @@ def CreateSnapshot(self, source_volume_id, name, parameters=None): pass else: if (k, v) == ("name", "This field must be unique."): - snap = self.vms_session.get_snapshot(snapshot_name=snapshot_name) + snap = vms_session.get_snapshot(snapshot_name=snapshot_name) if snap.path.strip("/") != path.strip("/"): raise Abort( ALREADY_EXISTS, @@ -645,19 +643,19 @@ def CreateSnapshot(self, source_volume_id, name, parameters=None): return types.CreateSnapResp(snapshot=snp) - def DeleteSnapshot(self, snapshot_id): + def DeleteSnapshot(self, vms_session, snapshot_id): if CONF.mock_vast: CONF.fake_snapshot_store[snapshot_id].delete() else: - snapshot = self.vms_session.get_snapshot(snapshot_id=snapshot_id) - self.vms_session.delete_snapshot(snapshot_id) - if self.vms_session.get_quotas_by_path(snapshot.path): + snapshot = vms_session.get_snapshot(snapshot_id=snapshot_id) + vms_session.delete_snapshot(snapshot_id) + if vms_session.get_quotas_by_path(snapshot.path): pass # quotas still exist - elif self.vms_session.has_snapshots(snapshot.path): + elif vms_session.has_snapshots(snapshot.path): pass # other snapshots still exist else: logger.info(f"last snapshot for {snapshot.path}, and no more quotas - let's delete this directory") - self._delete_data_from_storage(snapshot.path, snapshot.tenant_id) + self._delete_data_from_storage(vms_session, snapshot.path, snapshot.tenant_id) return types.DeleteSnapResp() @@ -695,6 +693,7 @@ def NodeGetCapabilities(self): def NodePublishVolume( self, + vms_session, volume_id, target_path, volume_capability=None, @@ -723,6 +722,7 @@ def NodePublishVolume( ) self.controller.CreateVolume.__wrapped__( self.controller, + vms_session=vms_session, name=volume_id, volume_capabilities=[], ephemeral_volume_name=eph_volume_name, @@ -731,6 +731,7 @@ def NodePublishVolume( ) resp = self.controller.ControllerPublishVolume.__wrapped__( self.controller, + vms_session=vms_session, node_id=CONF.node_id, volume_id=volume_id, volume_capability=volume_capability, @@ -745,6 +746,7 @@ def NodePublishVolume( logger.info("attach_required is disabled, obtaining publish context") resp = self.controller.ControllerPublishVolume.__wrapped__( self.controller, + vms_session=vms_session, node_id=CONF.node_id, volume_id=volume_id, volume_capability=volume_capability, @@ -785,8 +787,12 @@ def NodePublishVolume( target_path.mkdir() meta_file = target_path[".vast-csi-meta"] + payload = dict(volume_id=volume_id, is_ephemeral=is_ephemeral) + if is_ephemeral: + payload["vms_session"] = vms_session.serialize(salt=volume_id.encode()) with meta_file.open("w") as f: - json.dump(dict(volume_id=volume_id, is_ephemeral=is_ephemeral), f) + json.dump(payload, f) + os.chmod(meta_file, 0o600) logger.info(f"created: {target_path}") flags = ["ro"] if readonly else [] @@ -803,7 +809,7 @@ def NodePublishVolume( return types.NodePublishResp() - def NodeUnpublishVolume(self, target_path): + def NodeUnpublishVolume(self, volume_id, target_path, vms_session=None): target_path = local.path(target_path) if not target_path.exists(): @@ -832,8 +838,19 @@ def NodeUnpublishVolume(self, target_path): with target_path[".vast-csi-meta"].open("r") as f: meta = json.load(f) if meta.get("is_ephemeral"): - controller = CsiController() - controller.DeleteVolume.__wrapped__(controller, meta["volume_id"]) + if vms_session_data := meta.get("vms_session"): + vms_session = VmsSession.deserialize( + salt=volume_id.encode(), encrypted_data=vms_session_data + ) + elif not vms_session: + raise Abort( + FAILED_PRECONDITION, + "Ephemeral Volume provisioning requires " + "configuring a global VMS credentials secret or nodePublishSecretRef secret reference." + ) + self.controller.DeleteVolume.__wrapped__( + self.controller, vms_session=vms_session, volume_id=meta["volume_id"] + ) if target_path[".vast-csi-meta"].exists(): os.remove(target_path[".vast-csi-meta"]) @@ -873,9 +890,9 @@ def DriverGetInfo(self, request, context): return types.DriverGetInfoResp(name=CONF.plugin_name) -class CosiProvisioner(cosi_grpc.ProvisionerServicer, Instrumented, SessionMixin): +class CosiProvisioner(cosi_grpc.ProvisionerServicer, Instrumented): - def DriverCreateBucket(self, name, parameters): + def DriverCreateBucket(self, vms_session, name, parameters): if (root_export := parameters.pop("root_export", None)) is None: raise MissingParameter(param="root_export") if not (vip_pool_name := parameters.pop("vip_pool_name", None)): @@ -886,10 +903,10 @@ def DriverCreateBucket(self, name, parameters): name = name[:CONF.truncate_volume_name] # crop to Vast's max-length uid = randint(50000, 60000) - self.vms_session.ensure_user(uid=uid, name=name, allow_create_bucket=True) - view = self.vms_session.ensure_s3view(bucket_name=name, root_export=root_export, **parameters) + vms_session.ensure_user(uid=uid, name=name, allow_create_bucket=True) + view = vms_session.ensure_s3view(bucket_name=name, root_export=root_export, **parameters) port = 443 if scheme == "https" else 80 - vip = self.vms_session.get_vip(vip_pool_name=vip_pool_name, tenant_id=view.tenant_id) + vip = vms_session.get_vip(vip_pool_name=vip_pool_name, tenant_id=view.tenant_id) # bucket_id contains bucket name and endpoint # should be smth like test-bucket-caf9e0d0-0b9a-4b5e-8b0a-9b0brb0b4c0c@1@https://172.0.0.1:443 return types.DriverCreateBucketResp( @@ -902,19 +919,19 @@ def DriverCreateBucket(self, name, parameters): ) ) - def DriverDeleteBucket(self, bucket_id, delete_context): + def DriverDeleteBucket(self, vms_session, bucket_id, delete_context): bucket_id, _, _ = bucket_id.split('@') - if view := self.vms_session.get_view(bucket=bucket_id): - self.vms_session.delete_folder(view.path, view.tenant_id) - self.vms_session.delete_view_by_id(view.id) - if user := self.vms_session.get_user(bucket_id): - self.vms_session.delete_user(user.id) + if view := vms_session.get_view(bucket=bucket_id): + vms_session.delete_folder(view.path, view.tenant_id) + vms_session.delete_view_by_id(view.id) + if user := vms_session.get_user(bucket_id): + vms_session.delete_user(user.id) return types.DriverDeleteBucketResp() - def DriverGrantBucketAccess(self, bucket_id, name): + def DriverGrantBucketAccess(self, vms_session, bucket_id, name): bucket_id, _, endpoint = bucket_id.split('@') - user = self.vms_session.get_user(bucket_id) - creds = self.vms_session.generate_access_key(user.id) + user = vms_session.get_user(bucket_id) + creds = vms_session.generate_access_key(user.id) credentials = dict( s3=types.CredentialDetails( secrets={"accessKeyID": creds.access_key, "accessSecretKey": creds.secret_key, "endpoint": endpoint} @@ -925,10 +942,10 @@ def DriverGrantBucketAccess(self, bucket_id, name): credentials=credentials ) - def DriverRevokeBucketAccess(self, bucket_id, account_id): + def DriverRevokeBucketAccess(self, vms_session, bucket_id, account_id): bucket_id, _, _ = bucket_id.split('@') - if user := self.vms_session.get_user(bucket_id): - self.vms_session.delete_access_key(user.id, account_id) + if user := vms_session.get_user(bucket_id): + vms_session.delete_access_key(user.id, account_id) return types.DriverRevokeBucketAccessResp() diff --git a/vast_csi/utils.py b/vast_csi/utils.py index 29b8d3cc..bb9b23d4 100644 --- a/vast_csi/utils.py +++ b/vast_csi/utils.py @@ -5,18 +5,9 @@ from plumbum import local from easypy.caching import locking_cache - -from easypy.tokens import ( - Token, - ROUNDROBIN, - RANDOM, -) from . import csi_types as types -LOAD_BALANCING_STRATEGIES = {ROUNDROBIN, RANDOM} - - PATH_ALIASES = { re.compile('.*/site-packages'): '*', re.compile("%s/" % local.cwd): '' @@ -108,16 +99,6 @@ def patch_traceback_format(): orig_format_traceback, StackSummary.format = StackSummary.format, nice_format_traceback -def parse_load_balancing_strategy(load_balancing: str): - """Convert load balancing to 'Token' representation.""" - lb = Token(load_balancing.upper()) - if lb not in LOAD_BALANCING_STRATEGIES: - raise Exception( - f"invalid load balancing strategy: {lb} (use {'|'.join(LOAD_BALANCING_STRATEGIES)})" - ) - return lb - - def normalize_mount_options(mount_options: str): """Convert mount options to list if mount options were provided as string on StorageClass parameters level.""" s = mount_options.strip() diff --git a/vast_csi/vms_session.py b/vast_csi/vms_session.py index 85a57456..3ddd14c0 100644 --- a/vast_csi/vms_session.py +++ b/vast_csi/vms_session.py @@ -1,36 +1,34 @@ import os import json import requests +import hashlib +import pickle +import base64 from pprint import pformat -from typing import ClassVar from uuid import uuid4 from contextlib import contextmanager from datetime import datetime from requests.exceptions import ConnectionError from requests.utils import default_user_agent +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.backends import default_backend from easypy.bunch import Bunch from easypy.caching import cached_property from easypy.collections import shuffled -from easypy.misc import at_least from easypy.semver import SemVer -from easypy.caching import timecache -from easypy.units import HOUR +from easypy.caching import timecache, locking_cache +from easypy.units import HOUR, MINUTE from easypy.resilience import retrying, resilient -from easypy.tokens import ( - ROUNDROBIN, - RANDOM, - CONTROLLER_AND_NODE, - CONTROLLER, - NODE, -) from easypy.humanize import yesno_to_bool from plumbum import cmd from plumbum import local, ProcessExecutionError from .logging import logger -from .exceptions import ApiError, MountFailed, OperationNotSupported -from .utils import parse_load_balancing_strategy, generate_ip_range +from .configuration import Config +from .exceptions import ApiError, MountFailed, OperationNotSupported, LookupFieldError +from .utils import generate_ip_range from . import csi_types as types @@ -69,6 +67,20 @@ class CannotUseTrashAPI(OperationNotSupported): template = "Cannot delete folder via VMS: {reason}" +def _derive_key(salt): + # Derive a key from the salt + kdf = hashes.Hash(hashes.SHA256(), backend=default_backend()) + kdf.update(salt) + return kdf.finalize() + + +@locking_cache +def get_vms_session(username=None, password=None, endpoint=None, ssl_cert=None): + config = Config() + session_cls = TestVmsSession if config.mock_vast else VmsSession + return session_cls.create(config=config, username=username, password=password, endpoint=endpoint, ssl_cert=ssl_cert) + + class RESTSession(requests.Session): def __init__(self, config): super().__init__() @@ -76,37 +88,7 @@ def __init__(self, config): self.headers["Accept"] = "application/json" self.headers["Content-Type"] = "application/json" self.headers["User-Agent"] = f"VastCSI/{config.plugin_version}.{config.ci_pipe}.{config.git_commit[:10]} ({config._mode.capitalize()}) {default_user_agent()}" - self.base_url = f"https://{self.config.vms_host}/api/v1" - # Modify the SSL verification CA bundle path established - # by the underlying Certifi library's defaults if ssl_verify==True. - # This way requests library can use mounted CA bundle or default system CA bundle under the same path. - self.ssl_verify = (False, "/etc/ssl/certs/ca-certificates.crt")[self.config.ssl_verify] - - def refresh_auth_token(self): - try: - resp = super().request( - "POST", f"{self.base_url}/token/", verify=self.ssl_verify, timeout=5, - json={"username": self.config.vms_user, "password": self.config.vms_password} - ) - resp.raise_for_status() - token = resp.json()["access"] - self.headers['authorization'] = f"Bearer {token}" - except ConnectionError as e: - raise ApiError( - response=Bunch( - status_code=None, - text=f"The vms on the designated host {self.config.vms_host!r} " - f"cannot be accessed. Please verify the specified endpoint. " - f"origin error: {e}" - )) - self.usage_report() - - @resilient.error(msg="failed to report usage to VMS") - def usage_report(self): - self.post("plugins/usage/", data={ - "vendor": "vastdata", "name": "vast-csi", - "version": self.config.plugin_version, "build": self.config.git_commit[:10] - }) + self.headers['authorization'] = f"Bearer #" # will be updated on first request @retrying.debug(times=3, acceptable=retrying.Retry) def request(self, verb, api_method, *args, params=None, log_result=True, **kwargs): @@ -167,13 +149,88 @@ def func(*args, log_result=True, **params): setattr(self, attr, func) return func - class VmsSession(RESTSession): """ Communication with vms cluster. Operations over vip pools, quotas, snapshots etc. """ - _vip_round_robin_idx: ClassVar[int] = -1 + def __init__(self, config, username, password, endpoint, ssl_cert): + super().__init__(config) + self.username = username + self.password = password + self.endpoint = endpoint + self.ssl_cert = ssl_cert + self.base_url = f"https://{endpoint}/api/v1" + # Modify the SSL verification CA bundle path established + # by the underlying Certifi library's defaults if ssl_verify==True. + certs_base_dir = "/etc/ssl/certs" + if ssl_cert: + # Store the certificate specified in StorageClass secret (unique for each StorageClass) + hash_obj = hashlib.sha256("".join([username, password, endpoint]).encode()) + unique_hash = hash_obj.hexdigest() + cert_path = f"{certs_base_dir}/{endpoint}-{unique_hash}.crt" + with open(cert_path, "w") as f: + f.write(ssl_cert) + logger.info(f"Generated new ssl certificate: {cert_path!r}") + else: + # Use certificate provided from global `sslCertsSecretName` secret (common for all StorageClasses) + # This way requests library can use mounted CA bundle or default system CA bundle under the same path. + cert_path = f"{certs_base_dir}/ca-certificates.crt" + self.ssl_verify = (False, cert_path)[config.ssl_verify] + + def serialize(self, salt: str): + session_data = pickle.dumps((self.username, self.password, self.endpoint, self.ssl_cert)) + iv = os.urandom(16) + key = _derive_key(salt) + cipher = Cipher(algorithms.AES(key), modes.CFB(iv), backend=default_backend()) + encryptor = cipher.encryptor() + ciphertext = encryptor.update(session_data) + encryptor.finalize() + # Return IV and ciphertext (both base64 encoded for storage) + return base64.b64encode(iv + ciphertext).decode() + + @classmethod + def deserialize(cls, salt: str, encrypted_data: str): + encrypted_data = base64.b64decode(encrypted_data) + # Extract IV and ciphertext + iv = encrypted_data[:16] + ciphertext = encrypted_data[16:] + # Create cipher object + key = _derive_key(salt) + cipher = Cipher(algorithms.AES(key), modes.CFB(iv), backend=default_backend()) + decryptor = cipher.decryptor() + # Decrypt the data + plainbytes = decryptor.update(ciphertext) + decryptor.finalize() + username, password, endpoint, ssl_cert = pickle.loads(plainbytes) + return get_vms_session(username=username, password=password, endpoint=endpoint, ssl_cert=ssl_cert) + + @classmethod + def create(cls, config, username, password, endpoint, ssl_cert): + """ + Create instance of session. + username, password endpoint are optional and in context of csi driver comes from secret if passed as argument. + Otherwise, username, password and endpoint are taken from locally mounted secret (COSI case). + """ + # The presence of the name in the arguments already indicates + # that we have a StorageClass scope secret at this point. + # In other words, it's not a globally mounted secret. Other secret fields will be validated below. + is_global = not bool(username) + if config.vms_credentials_store.exists() and is_global: + username = config.vms_user + password = config.vms_password + endpoint = config.vms_host + if not endpoint: + raise LookupFieldError(field="endpoint", tip="Make sure endpoint is specified in values.yaml.") + if not username: + raise LookupFieldError(field="username", tip="Make sure username is present in secret.") + if not password: + raise LookupFieldError(field="password", tip="Make sure password is present in secret.") + if not endpoint: + raise LookupFieldError(field="endpoint", tip="Make sure endpoint is present in secret.") + session = cls(config, username, password, endpoint, ssl_cert) + config_source = "mounted configuration" if is_global else "secret" + ssl_verification = "enabled" if session.ssl_verify else "disabled" + logger.info(f"VMS session has been instantiated from {config_source}. SSL verification {ssl_verification}.") + return session @property @timecache(HOUR) @@ -200,6 +257,33 @@ def delete_folder(self, path: str, tenant_id: int): # unpredictable error raise + def refresh_auth_token(self): + try: + resp = super(RESTSession, self).request( + "POST", f"{self.base_url}/token/", verify=self.ssl_verify, timeout=5, + json={"username": self.username, "password": self.password} + ) + resp.raise_for_status() + token = resp.json()["access"] + self.headers['authorization'] = f"Bearer {token}" + except ConnectionError as e: + raise ApiError( + response=Bunch( + status_code=None, + text=f"The vms on the designated host {self.config.vms_host!r} " + f"cannot be accessed. Please verify the specified endpoint. " + f"origin error: {e}" + )) + self.usage_report() + + @requisite(semver="5.2.0", ignore=True) + @resilient.error(msg="failed to report usage to VMS") + def usage_report(self): + self.post("plugins/usage/", data={ + "vendor": "vastdata", "name": "vast-csi", + "version": self.config.plugin_version, "build": self.config.git_commit[:10] + }) + # ---------------------------- # View policies def get_view_policy(self, policy_name: str): @@ -300,37 +384,35 @@ def temp_view(self, path, policy_id, tenant_id) -> Bunch: self.delete_view_by_id(view.id) # ---------------------------- + @timecache(5 * MINUTE) + def get_vip_pool(self, vip_pool_name: str) -> Bunch: + if not (vippools := self.vippools(name=vip_pool_name)): + raise Exception(f"No VIP Pool named '{vip_pool_name}'") + return vippools[0] + # Vip pools - def get_vip(self, vip_pool_name: str, tenant_id: int = None, load_balancing: str = None): + def get_vip(self, vip_pool_name: str, tenant_id: int = None): """ - Get vip pool by provided vip_pool_name. + Get vip by provided vip_pool_name. + tenant_id is optional argument for validation. tenant_id usually + make sense only during volume deletion where deletionVipPool and deletionViewPolicy + is used. For such case additional validation might help to troubleshoot + tenant misconfiguration. Returns: - One of ips from provided vip pool according to provided load balancing strategy. + Random vip ip from provided vip pool. """ - load_balancing = parse_load_balancing_strategy(load_balancing or self.config.load_balancing) - if not (vippools := self.vippools(name=vip_pool_name)): - raise Exception(f"No VIP Pool named '{vip_pool_name}'") - - vippool = vippools[0] + vippool = self.get_vip_pool(vip_pool_name) + if isinstance(tenant_id, str): + # for tenant_id passed as volume context. + tenant_id = int(tenant_id) if tenant_id and vippool.tenant_id != tenant_id: raise Exception( f"Pool {vip_pool_name} belongs to tenant with id {vippool.tenant_id} but {tenant_id=} was requested" ) vips = generate_ip_range(vippool.ip_ranges) assert vips, f"Pool {vip_pool_name} has no available vips" - if load_balancing == ROUNDROBIN: - self._vip_round_robin_idx = (self._vip_round_robin_idx + 1) % len(vips) - vip = vips[self._vip_round_robin_idx] - elif load_balancing == RANDOM: - vip = shuffled(vips)[0] - else: - raise Exception( - f"Invalid load_balancing mode: '{load_balancing}'" - ) - - logger.info( - f"Using {load_balancing} - chose {vip}" - ) + vip = shuffled(vips)[0] + logger.info(f"Using - {vip}") return vip # ---------------------------- @@ -476,6 +558,13 @@ def delete_access_key(self, user_id, access_key): class TestVmsSession(RESTSession): """RestSession simulation for sanity tests""" + def __init__(self, config): + super().__init__(config) + + @classmethod + def create(cls, config: Config, *_, **__): + return cls(config) + def create_fake_quota(self, volume_id): class FakeQuota: diff --git a/vast_csi/volume_builder.py b/vast_csi/volume_builder.py index e92c8179..c77bdb1d 100644 --- a/vast_csi/volume_builder.py +++ b/vast_csi/volume_builder.py @@ -1,6 +1,8 @@ import os from dataclasses import dataclass from abc import ABC +from base64 import b32encode +from random import getrandbits from datetime import timedelta from typing import Optional, final, TypeVar @@ -46,7 +48,7 @@ def build_volume(self, **kwargs) -> CreatedVolumeT: class BaseBuilder(VolumeBuilderI): """Common builder with shared methods/attributes""" - controller: "ControllerServicer" + vms_session: "RESTSession" configuration: "CONF" name: str # Name of volume or snapshot @@ -54,9 +56,9 @@ class BaseBuilder(VolumeBuilderI): root_export: str volume_name_fmt: str view_policy: str - vip_pool_name: str + vip_pool_name: Optional[str] + vip_pool_fqdn: Optional[str] mount_options: str - lb_strategy: str qos_policy: Optional[str] capacity_range: Optional[int] # Optional desired volume capacity @@ -96,14 +98,17 @@ def mount_protocol(self): @property def volume_context(self): - return { + context = { "root_export": self.root_export_abs, - "vip_pool_name": self.vip_pool_name, - "lb_strategy": self.lb_strategy, "mount_options": self.mount_options, "view_policy": self.view_policy, "protocol": self.mount_protocol, } + if self.vip_pool_name: + context["vip_pool_name"] = self.vip_pool_name + elif self.vip_pool_fqdn: + context["vip_pool_fqdn"] = self.vip_pool_fqdn_with_prefix + return context @property def view_path(self): @@ -113,6 +118,11 @@ def view_path(self): def root_export_abs(self): return os.path.join("/", self.root_export) + @property + def vip_pool_fqdn_with_prefix(self): + prefix = b32encode(getrandbits(16).to_bytes(2, "big")).decode("ascii").rstrip("=") + return f"{prefix}.{self.vip_pool_fqdn}" + def build_volume(self) -> types.Volume: """ Main build entrypoint for volumes. @@ -124,7 +134,7 @@ def build_volume(self) -> types.Volume: volume_context["volume_name"] = volume_name # Check if view with expected system path already exists. - view = self.controller.vms_session.ensure_view( + view = self.vms_session.ensure_view( path=self.view_path, protocols=[self.mount_protocol], view_policy=self.view_policy, qos_policy=self.qos_policy ) quota = self._ensure_quota(requested_capacity, volume_name, self.view_path, view.tenant_id) @@ -137,7 +147,7 @@ def build_volume(self) -> types.Volume: ) def _ensure_quota(self, requested_capacity, volume_name, view_path, tenant_id): - if quota := self.controller.vms_session.get_quota(self.name): + if quota := self.vms_session.get_quota(self.name): # Check if volume with provided name but another capacity already exists. if quota.hard_limit != requested_capacity: raise VolumeAlreadyExists( @@ -156,7 +166,7 @@ def _ensure_quota(self, requested_capacity, volume_name, view_path, tenant_id): ) if requested_capacity: data.update(hard_limit=requested_capacity) - quota = self.controller.vms_session.create_quota(data=data) + quota = self.vms_session.create_quota(data=data) return quota @@ -171,7 +181,7 @@ def build_volume(self) -> types.Volume: volume_context["volume_name"] = volume_name source_volume_id = self.volume_content_source.volume.volume_id - if not (source_quota := self.controller.vms_session.get_quota(source_volume_id)): + if not (source_quota := self.vms_session.get_quota(source_volume_id)): raise SourceNotFound(f"Unknown volume: {source_volume_id}") source_path = source_quota.path @@ -179,17 +189,17 @@ def build_volume(self) -> types.Volume: snapshot_name = f"snp-{self.name}" snapshot_stream_name = f"strm-{self.name}" - snapshot = self.controller.vms_session.ensure_snapshot( + snapshot = self.vms_session.ensure_snapshot( snapshot_name=snapshot_name, path=source_path, tenant_id=tenant_id, expiration_delta=timedelta(minutes=5) ) - snapshot_stream = self.controller.vms_session.ensure_snapshot_stream( + snapshot_stream = self.vms_session.ensure_snapshot_stream( snapshot_id=snapshot.id, destination_path=self.view_path, tenant_id=tenant_id, snapshot_stream_name=snapshot_stream_name, ) # View should go after snapshot stream. # Otherwise, snapshot stream action will detect folder already exist and will be rejected - view = self.controller.vms_session.ensure_view( + view = self.vms_session.ensure_view( path=self.view_path, protocols=[self.mount_protocol], view_policy=self.view_policy, qos_policy=self.qos_policy ) @@ -219,7 +229,7 @@ def build_volume(self) -> types.Volume: Create snapshot representation. """ source_snapshot_id = self.volume_content_source.snapshot.snapshot_id - if not (snapshot := self.controller.vms_session.get_snapshot(snapshot_id=source_snapshot_id)): + if not (snapshot := self.vms_session.get_snapshot(snapshot_id=source_snapshot_id)): raise SourceNotFound(f"Unknown snapshot: {source_snapshot_id}") volume_context = self.volume_context @@ -234,11 +244,11 @@ def build_volume(self) -> types.Volume: volume_context["volume_name"] = volume_name snapshot_stream_name = f"strm-{self.name}" - snapshot_stream = self.controller.vms_session.ensure_snapshot_stream( + snapshot_stream = self.vms_session.ensure_snapshot_stream( snapshot_id=snapshot.id, destination_path=self.view_path, tenant_id=tenant_id, snapshot_stream_name=snapshot_stream_name, ) - view = self.controller.vms_session.ensure_view( + view = self.vms_session.ensure_view( path=self.view_path, protocols=[self.mount_protocol], view_policy=self.view_policy, qos_policy=self.qos_policy ) @@ -279,7 +289,7 @@ def build_volume_name(self) -> str: pass def get_existing_capacity(self) -> Optional[int]: - volume = self.controller.vms_session.get_quota(self.name) + volume = self.vms_session.get_quota(self.name) if volume: return volume.capacity_bytes @@ -301,7 +311,7 @@ def build_volume(self) -> types.Volume: f"({existing_capacity})", ) - vol_dir = self.controller.vms_session._mock_mount[self.name] + vol_dir = self.vms_session._mock_mount[self.name] vol_dir.mkdir() volume = types.Volume( diff --git a/version.txt b/version.txt index 8721bbc4..a3721209 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v2.4.0 +v2.4.1