diff --git a/cluster-tests/tests/cloudsync/__init__.py b/cluster-tests/tests/cloudsync/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/cluster-tests/tests/cloudsync/test_cloudsync.py b/cluster-tests/tests/cloudsync/test_cloudsync.py new file mode 100644 index 0000000000000..3d6dda3ce8532 --- /dev/null +++ b/cluster-tests/tests/cloudsync/test_cloudsync.py @@ -0,0 +1,56 @@ +import pytest + +from middlewared.client import ClientException +from middlewared.test.integration.assets.cloudsync import * + +from config import CLUSTER_INFO, CLUSTER_IPS +from utils import make_request, ssh_test, wait_on_job + +LOCAL_PATH = f'/cluster/{CLUSTER_INFO["GLUSTER_VOLUME"]}/cloudsync_01' +CLUSTER_PATH = f'CLUSTER:{CLUSTER_INFO["GLUSTER_VOLUME"]}/cloudsync_01' + + +def test_works(): + res = ssh_test(CLUSTER_IPS[0], f'mkdir {LOCAL_PATH}') + assert res['result'], res['stderr'] + try: + res = ssh_test(CLUSTER_IPS[0], f'echo test > {LOCAL_PATH}/file01') + assert res['result'], res['stderr'] + + try: + with local_s3_task({ + "path": CLUSTER_PATH, + }) as task: + run_task(task) + + res = ssh_test(CLUSTER_IPS[0], f'cat {LOCAL_PATH}/file01') + assert res['result'], res['stderr'] + + assert res['output'] == 'test\n' + finally: + res = ssh_test(CLUSTER_IPS[0], f'rm -rf {LOCAL_PATH}') + assert res['result'], res['stderr'] + finally: + res = ssh_test(CLUSTER_IPS[0], f'rm -rf {LOCAL_PATH}') + assert res['result'], res['stderr'] + + +def test_invalid_cluster_path(): + with pytest.raises(ClientException) as e: + with local_s3_task({ + "path": CLUSTER_PATH, + }) as task: + run_task(task) + + assert str(e.value) == f"[EFAULT] Directory '{CLUSTER_PATH}' does not exist" + + +def test_cluster_path_snapshot(): + with pytest.raises(ClientException) as e: + with local_s3_task({ + "path": CLUSTER_PATH, + "snapshot": True + }) as task: + pass + + assert str(e.value) == f"[EINVAL] cloud_sync_create.snapshot: This option can not be used for cluster paths" diff --git a/src/middlewared/middlewared/async_validators.py b/src/middlewared/middlewared/async_validators.py index 4a2316029f707..0e6ae99c452f6 100644 --- a/src/middlewared/middlewared/async_validators.py +++ b/src/middlewared/middlewared/async_validators.py @@ -6,11 +6,7 @@ from middlewared.validators import IpAddress -async def check_path_resides_within_volume(verrors, middleware, name, path, gluster_bypass=False): - - # when a sharing service is using gluster, the path checks below do not apply - if gluster_bypass: - return +async def check_path_resides_within_volume(verrors, middleware, name, path): # we need to make sure the sharing service is configured within the zpool rp = os.path.realpath(path) diff --git a/src/middlewared/middlewared/plugins/cloud_sync.py b/src/middlewared/middlewared/plugins/cloud_sync.py index 1db6d17070f41..5324efdf74c8d 100644 --- a/src/middlewared/middlewared/plugins/cloud_sync.py +++ b/src/middlewared/middlewared/plugins/cloud_sync.py @@ -135,21 +135,34 @@ def get_remote_path(provider, attributes): return remote_path -def check_local_path(path): +def check_local_path(path, *, check_mountpoint=True, error_text_path=None): + error_text_path = error_text_path or path + if not os.path.exists(path): - raise CallError(f"Directory {path!r} does not exist") + raise CallError(f"Directory {error_text_path!r} does not exist") if not os.path.isdir(path): - raise CallError(f"{path!r} is not a directory") + raise CallError(f"{error_text_path!r} is not a directory") - if not os.path.normpath(path).startswith("/mnt/") or os.stat(path).st_dev == os.stat("/mnt").st_dev: - raise CallError(f"Directory {path!r} must reside within volume mount point") + if check_mountpoint: + if not os.path.normpath(path).startswith("/mnt/") or os.stat(path).st_dev == os.stat("/mnt").st_dev: + raise CallError(f"Directory {error_text_path!r} must reside within volume mount point") async def rclone(middleware, job, cloud_sync, dry_run): await middleware.call("network.general.will_perform_activity", "cloud_sync") - await middleware.run_in_thread(check_local_path, cloud_sync["path"]) + if await middleware.call("filesystem.is_cluster_path", cloud_sync["path"]): + path = await middleware.call("filesystem.resolve_cluster_path", cloud_sync["path"]) + await middleware.run_in_thread( + check_local_path, + path, + check_mountpoint=False, + error_text_path=cloud_sync["path"], + ) + else: + path = cloud_sync["path"] + await middleware.run_in_thread(check_local_path, path) # Use a temporary file to store rclone file async with RcloneConfig(cloud_sync) as config: @@ -185,7 +198,6 @@ async def rclone(middleware, job, cloud_sync, dry_run): args += [cloud_sync["transfer_mode"].lower()] snapshot = None - path = cloud_sync["path"] if cloud_sync["direction"] == "PUSH": if cloud_sync["snapshot"]: dataset, recursive = get_dataset_recursive( @@ -826,16 +838,18 @@ async def _validate(self, verrors, name, data): if limit1["time"] >= limit2["time"]: verrors.add(f"{name}.bwlimit.{i + 1}.time", f"Invalid time order: {limit1['time']}, {limit2['time']}") - await self.validate_path_field(data, name, verrors) + await self.validate_path_field(data, name, verrors, allow_cluster=True) if data["snapshot"]: if data["direction"] != "PUSH": verrors.add(f"{name}.snapshot", "This option can only be enabled for PUSH tasks") if data["transfer_mode"] == "MOVE": verrors.add(f"{name}.snapshot", "This option can not be used for MOVE transfer mode") - if await self.middleware.call("pool.dataset.query", - [["name", "^", os.path.relpath(data["path"], "/mnt") + "/"], - ["type", "=", "FILESYSTEM"]]): + if await self.middleware.call("filesystem.is_cluster_path", data["path"]): + verrors.add(f"{name}.snapshot", "This option can not be used for cluster paths") + elif await self.middleware.call("pool.dataset.query", + [["name", "^", os.path.relpath(data["path"], "/mnt") + "/"], + ["type", "=", "FILESYSTEM"]]): verrors.add(f"{name}.snapshot", "This option is only available for datasets that have no further " "nesting") diff --git a/src/middlewared/middlewared/plugins/filesystem.py b/src/middlewared/middlewared/plugins/filesystem.py index 856ab743fca36..aec9346a01599 100644 --- a/src/middlewared/middlewared/plugins/filesystem.py +++ b/src/middlewared/middlewared/plugins/filesystem.py @@ -42,6 +42,10 @@ def set_immutable(self, set_flag, path): """ chflags.set_immutable(path, set_flag) + @private + def is_cluster_path(self, path): + return path.startswith(FuseConfig.FUSE_PATH_SUBST.value) + @private def resolve_cluster_path(self, path, ignore_ctdb=False): """ diff --git a/src/middlewared/middlewared/plugins/filesystem_/acl_linux.py b/src/middlewared/middlewared/plugins/filesystem_/acl_linux.py index a8d21ebcc38db..b17a51456d212 100644 --- a/src/middlewared/middlewared/plugins/filesystem_/acl_linux.py +++ b/src/middlewared/middlewared/plugins/filesystem_/acl_linux.py @@ -6,7 +6,6 @@ from pathlib import Path from middlewared.service import private, CallError, ValidationErrors, Service -from middlewared.plugins.cluster_linux.utils import FuseConfig from .acl_base import ACLBase, ACLType @@ -32,7 +31,7 @@ def acltool(self, path, action, uid, gid, options): raise CallError(f"acltool [{action}] on path {path} failed with error: [{acltool.stderr.decode().strip()}]") def _common_perm_path_validate(self, schema, data, verrors): - is_cluster = data['path'].startswith(FuseConfig.FUSE_PATH_SUBST.value) + is_cluster = self.middleware.call_sync('filesystem.is_cluster_path', data['path']) try: data['path'] = self.middleware.call_sync('filesystem.resolve_cluster_path', data['path']) except CallError as e: diff --git a/src/middlewared/middlewared/plugins/nfs.py b/src/middlewared/middlewared/plugins/nfs.py index ad6f76aa65815..f9552dd224842 100644 --- a/src/middlewared/middlewared/plugins/nfs.py +++ b/src/middlewared/middlewared/plugins/nfs.py @@ -431,15 +431,13 @@ async def validate(self, data, schema_name, verrors, old=None): verrors.add(f"{schema_name}.alldirs", "This option can only be used for shares that contain single path") # if any of the `paths` that were passed to us by user are within the gluster volume - # mountpoint then we need to pass the `gluster_bypass` kwarg so that we don't raise a - # validation error complaining about using a gluster path within the zpool mountpoint - bypass = any('.glusterfs' in i for i in data["paths"] + data["aliases"]) - - # need to make sure that the nfs share is within the zpool mountpoint - for idx, i in enumerate(data["paths"]): - await check_path_resides_within_volume( - verrors, self.middleware, f'{schema_name}.paths.{idx}', i, gluster_bypass=bypass - ) + # then we don't need to check whether the path is within the zpool mountpoint + if not any('.glusterfs' in i for i in data["paths"] + data["aliases"]): + # need to make sure that the nfs share is within the zpool mountpoint + for idx, i in enumerate(data["paths"]): + await check_path_resides_within_volume( + verrors, self.middleware, f'{schema_name}.paths.{idx}', i, + ) await self.middleware.run_in_thread(self.validate_paths, data, schema_name, verrors) diff --git a/src/middlewared/middlewared/plugins/smb.py b/src/middlewared/middlewared/plugins/smb.py index 3fad308e5bba2..3ba8d586f506f 100644 --- a/src/middlewared/middlewared/plugins/smb.py +++ b/src/middlewared/middlewared/plugins/smb.py @@ -1458,12 +1458,11 @@ async def validate(self, data, schema_name, verrors, old=None): verrors.add(f'{schema_name}.home', 'Only one share is allowed to be a home share.') - bypass = bool(data['cluster_volname']) - await self.cluster_share_validate(data, schema_name, verrors) if data['path']: - await self.validate_path_field(data, schema_name, verrors, bypass=bypass) + if not data['cluster_volname']: + await self.validate_path_field(data, schema_name, verrors) """ When path is not a clustervolname, legacy behavior is to make all path components diff --git a/src/middlewared/middlewared/service.py b/src/middlewared/middlewared/service.py index 0b4bc10f7201c..ea8b0f46fba54 100644 --- a/src/middlewared/middlewared/service.py +++ b/src/middlewared/middlewared/service.py @@ -1078,10 +1078,16 @@ async def sharing_task_extend_context(self, rows, extra): } @private - async def validate_path_field(self, data, schema, verrors, bypass=False): - await check_path_resides_within_volume( - verrors, self.middleware, f'{schema}.{self.path_field}', data.get(self.path_field), gluster_bypass=bypass, - ) + async def validate_path_field(self, data, schema, verrors, *, allow_cluster=False): + name = f'{schema}.{self.path_field}' + path = data[self.path_field] + + if await self.middleware.call('filesystem.is_cluster_path', path): + if not allow_cluster: + verrors.add(name, 'Cluster path is not allowed') + else: + await check_path_resides_within_volume(verrors, self.middleware, name, path) + return verrors @private diff --git a/src/middlewared/middlewared/test/integration/assets/cloudsync.py b/src/middlewared/middlewared/test/integration/assets/cloudsync.py new file mode 100644 index 0000000000000..e61d69214aaa7 --- /dev/null +++ b/src/middlewared/middlewared/test/integration/assets/cloudsync.py @@ -0,0 +1,90 @@ +# -*- coding=utf-8 -*- +import contextlib +import logging + +from middlewared.test.integration.assets.pool import dataset +from middlewared.test.integration.assets.s3 import s3_server +from middlewared.test.integration.utils import call + +logger = logging.getLogger(__name__) + +__all__ = ["credential", "task", "local_s3_credential", "local_s3_task", "run_task"] + + +@contextlib.contextmanager +def credential(data): + data = { + "name": "Test", + **data, + } + + credential = call("cloudsync.credentials.create", data) + try: + yield credential + finally: + call("cloudsync.credentials.delete", credential["id"]) + + +@contextlib.contextmanager +def task(data): + data = { + "description": "Test", + "schedule": { + "minute": "00", + "hour": "00", + "dom": "1", + "month": "1", + "dow": "1", + }, + **data + } + + task = call("cloudsync.create", data) + try: + yield task + finally: + call("cloudsync.delete", task["id"]) + + +@contextlib.contextmanager +def local_s3_credential(credential_params=None): + credential_params = credential_params or {} + + with dataset("cloudsync_remote") as remote_dataset: + with s3_server(remote_dataset) as s3: + with credential({ + "provider": "S3", + "attributes": { + "access_key_id": s3.access_key, + "secret_access_key": s3.secret_key, + "endpoint": "http://localhost:9000", + "skip_region": True, + **credential_params, + }, + }) as c: + yield c + + +@contextlib.contextmanager +def local_s3_task(params=None, credential_params=None): + params = params or {} + credential_params = credential_params or {} + + with dataset("cloudsync_local") as local_dataset: + with local_s3_credential(credential_params) as c: + with task({ + "direction": "PUSH", + "transfer_mode": "COPY", + "path": f"/mnt/{local_dataset}", + "credentials": c["id"], + "attributes": { + "bucket": "bucket", + "folder": "", + }, + **params, + }) as t: + yield t + + +def run_task(task, timeout=120): + call("cloudsync.sync", task["id"], job=True, timeout=timeout) diff --git a/src/middlewared/middlewared/test/integration/utils/client.py b/src/middlewared/middlewared/test/integration/utils/client.py index d94c63d450683..af690e3b5787a 100644 --- a/src/middlewared/middlewared/test/integration/utils/client.py +++ b/src/middlewared/middlewared/test/integration/utils/client.py @@ -9,6 +9,13 @@ @contextlib.contextmanager def client(): - with Client(f"ws://{os.environ['MIDDLEWARE_TEST_IP']}/websocket", py_exceptions=True) as c: - c.call("auth.login", "root", os.environ["MIDDLEWARE_TEST_PASSWORD"]) + if "NODE_A_IP" in os.environ: + host = os.environ["NODE_A_IP"] + password = os.environ["APIPASS"] + else: + host = os.environ["MIDDLEWARE_TEST_IP"] + password = os.environ["MIDDLEWARE_TEST_PASSWORD"] + + with Client(f"ws://{host}/websocket", py_exceptions=True) as c: + c.call("auth.login", "root", password) yield c diff --git a/src/middlewared/middlewared/test/integration/utils/pool.py b/src/middlewared/middlewared/test/integration/utils/pool.py index 261665424c2d0..c792ef794f9df 100644 --- a/src/middlewared/middlewared/test/integration/utils/pool.py +++ b/src/middlewared/middlewared/test/integration/utils/pool.py @@ -8,7 +8,7 @@ sys.path.append(apifolder) from auto_config import pool_name except ImportError: - pool_name = None + pool_name = os.environ["ZPOOL"] logger = logging.getLogger(__name__) diff --git a/tests/api2/test_cloudsync.py b/tests/api2/test_cloudsync.py index bad1a0b752bd5..3cb42606d141f 100644 --- a/tests/api2/test_cloudsync.py +++ b/tests/api2/test_cloudsync.py @@ -1,126 +1,23 @@ -import contextlib import re import time import pytest +from middlewared.test.integration.assets.cloudsync import * from middlewared.test.integration.assets.ftp import anonymous_ftp_server, ftp_server_with_user_account from middlewared.test.integration.assets.pool import dataset -from middlewared.test.integration.assets.s3 import s3_server from middlewared.test.integration.utils import call, pool, ssh import sys import os apifolder = os.getcwd() sys.path.append(apifolder) -from functions import PUT, POST, GET, DELETE from auto_config import dev_test reason = 'Skip for testing' # comment pytestmark for development testing with --dev-test pytestmark = pytest.mark.skipif(dev_test, reason=reason) -@contextlib.contextmanager -def credential(data): - data = { - "name": "Test", - **data, - } - - result = POST("/cloudsync/credentials/", data) - assert result.status_code == 200, result.text - credential = result.json() - - try: - yield credential - finally: - result = DELETE(f"/cloudsync/credentials/id/{credential['id']}/") - assert result.status_code == 200, result.text - - -@contextlib.contextmanager -def task(data): - data = { - "description": "Test", - "schedule": { - "minute": "00", - "hour": "00", - "dom": "1", - "month": "1", - "dow": "1", - }, - **data - } - - result = POST("/cloudsync/", data) - assert result.status_code == 200, result.text - task = result.json() - - try: - yield task - finally: - result = DELETE(f"/cloudsync/id/{task['id']}/") - assert result.status_code == 200, result.text - - -@contextlib.contextmanager -def local_s3_credential(credential_params=None): - credential_params = credential_params or {} - - with dataset("cloudsync_remote") as remote_dataset: - with s3_server(remote_dataset) as s3: - with credential({ - "provider": "S3", - "attributes": { - "access_key_id": s3.access_key, - "secret_access_key": s3.secret_key, - "endpoint": "http://localhost:9000", - "skip_region": True, - **credential_params, - }, - }) as c: - yield c - -@contextlib.contextmanager -def local_s3_task(params=None, credential_params=None): - params = params or {} - credential_params = credential_params or {} - - with dataset("cloudsync_local") as local_dataset: - with local_s3_credential(credential_params) as c: - with task({ - "direction": "PUSH", - "transfer_mode": "COPY", - "path": f"/mnt/{local_dataset}", - "credentials": c["id"], - "attributes": { - "bucket": "bucket", - "folder": "", - }, - **params, - }) as t: - yield t - - -def run_task(task): - result = POST(f"/cloudsync/id/{task['id']}/sync/") - assert result.status_code == 200, result.text - for i in range(120): - result = GET(f"/cloudsync/id/{task['id']}/") - assert result.status_code == 200, result.text - state = result.json() - if state["job"] is None: - time.sleep(1) - continue - if state["job"]["state"] in ["WAITING", "RUNNING"]: - time.sleep(1) - continue - assert state["job"]["state"] == "SUCCESS", state - return - - assert False, state - - def test_include(): with local_s3_task({ "include": ["/office/**", "/work/**"],