Skip to content

Commit

Permalink
Allow CLUSTER: locations to be used for cloud sync tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
themylogin committed Mar 11, 2022
1 parent 7e66d2e commit f6198f7
Show file tree
Hide file tree
Showing 13 changed files with 207 additions and 141 deletions.
Empty file.
56 changes: 56 additions & 0 deletions cluster-tests/tests/cloudsync/test_cloudsync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import pytest

from middlewared.client import ClientException
from middlewared.test.integration.assets.cloudsync import *

from config import CLUSTER_INFO, CLUSTER_IPS
from utils import make_request, ssh_test, wait_on_job

LOCAL_PATH = f'/cluster/{CLUSTER_INFO["GLUSTER_VOLUME"]}/cloudsync_01'
CLUSTER_PATH = f'CLUSTER:{CLUSTER_INFO["GLUSTER_VOLUME"]}/cloudsync_01'


def test_works():
res = ssh_test(CLUSTER_IPS[0], f'mkdir {LOCAL_PATH}')
assert res['result'], res['stderr']
try:
res = ssh_test(CLUSTER_IPS[0], f'echo test > {LOCAL_PATH}/file01')
assert res['result'], res['stderr']

try:
with local_s3_task({
"path": CLUSTER_PATH,
}) as task:
run_task(task)

res = ssh_test(CLUSTER_IPS[0], f'cat {LOCAL_PATH}/file01')
assert res['result'], res['stderr']

assert res['output'] == 'test\n'
finally:
res = ssh_test(CLUSTER_IPS[0], f'rm -rf {LOCAL_PATH}')
assert res['result'], res['stderr']
finally:
res = ssh_test(CLUSTER_IPS[0], f'rm -rf {LOCAL_PATH}')
assert res['result'], res['stderr']


def test_invalid_cluster_path():
with pytest.raises(ClientException) as e:
with local_s3_task({
"path": CLUSTER_PATH,
}) as task:
run_task(task)

assert str(e.value) == f"[EFAULT] Directory '{CLUSTER_PATH}' does not exist"


def test_cluster_path_snapshot():
with pytest.raises(ClientException) as e:
with local_s3_task({
"path": CLUSTER_PATH,
"snapshot": True
}) as task:
pass

assert str(e.value) == f"[EINVAL] cloud_sync_create.snapshot: This option can not be used for cluster paths"
6 changes: 1 addition & 5 deletions src/middlewared/middlewared/async_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@
from middlewared.validators import IpAddress


async def check_path_resides_within_volume(verrors, middleware, name, path, gluster_bypass=False):

# when a sharing service is using gluster, the path checks below do not apply
if gluster_bypass:
return
async def check_path_resides_within_volume(verrors, middleware, name, path):

# we need to make sure the sharing service is configured within the zpool
rp = os.path.realpath(path)
Expand Down
36 changes: 25 additions & 11 deletions src/middlewared/middlewared/plugins/cloud_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,34 @@ def get_remote_path(provider, attributes):
return remote_path


def check_local_path(path):
def check_local_path(path, *, check_mountpoint=True, error_text_path=None):
error_text_path = error_text_path or path

if not os.path.exists(path):
raise CallError(f"Directory {path!r} does not exist")
raise CallError(f"Directory {error_text_path!r} does not exist")

if not os.path.isdir(path):
raise CallError(f"{path!r} is not a directory")
raise CallError(f"{error_text_path!r} is not a directory")

if not os.path.normpath(path).startswith("/mnt/") or os.stat(path).st_dev == os.stat("/mnt").st_dev:
raise CallError(f"Directory {path!r} must reside within volume mount point")
if check_mountpoint:
if not os.path.normpath(path).startswith("/mnt/") or os.stat(path).st_dev == os.stat("/mnt").st_dev:
raise CallError(f"Directory {error_text_path!r} must reside within volume mount point")


async def rclone(middleware, job, cloud_sync, dry_run):
await middleware.call("network.general.will_perform_activity", "cloud_sync")

await middleware.run_in_thread(check_local_path, cloud_sync["path"])
if await middleware.call("filesystem.is_cluster_path", cloud_sync["path"]):
path = await middleware.call("filesystem.resolve_cluster_path", cloud_sync["path"])
await middleware.run_in_thread(
check_local_path,
path,
check_mountpoint=False,
error_text_path=cloud_sync["path"],
)
else:
path = cloud_sync["path"]
await middleware.run_in_thread(check_local_path, path)

# Use a temporary file to store rclone file
async with RcloneConfig(cloud_sync) as config:
Expand Down Expand Up @@ -185,7 +198,6 @@ async def rclone(middleware, job, cloud_sync, dry_run):
args += [cloud_sync["transfer_mode"].lower()]

snapshot = None
path = cloud_sync["path"]
if cloud_sync["direction"] == "PUSH":
if cloud_sync["snapshot"]:
dataset, recursive = get_dataset_recursive(
Expand Down Expand Up @@ -826,16 +838,18 @@ async def _validate(self, verrors, name, data):
if limit1["time"] >= limit2["time"]:
verrors.add(f"{name}.bwlimit.{i + 1}.time", f"Invalid time order: {limit1['time']}, {limit2['time']}")

await self.validate_path_field(data, name, verrors)
await self.validate_path_field(data, name, verrors, allow_cluster=True)

if data["snapshot"]:
if data["direction"] != "PUSH":
verrors.add(f"{name}.snapshot", "This option can only be enabled for PUSH tasks")
if data["transfer_mode"] == "MOVE":
verrors.add(f"{name}.snapshot", "This option can not be used for MOVE transfer mode")
if await self.middleware.call("pool.dataset.query",
[["name", "^", os.path.relpath(data["path"], "/mnt") + "/"],
["type", "=", "FILESYSTEM"]]):
if await self.middleware.call("filesystem.is_cluster_path", data["path"]):
verrors.add(f"{name}.snapshot", "This option can not be used for cluster paths")
elif await self.middleware.call("pool.dataset.query",
[["name", "^", os.path.relpath(data["path"], "/mnt") + "/"],
["type", "=", "FILESYSTEM"]]):
verrors.add(f"{name}.snapshot", "This option is only available for datasets that have no further "
"nesting")

Expand Down
4 changes: 4 additions & 0 deletions src/middlewared/middlewared/plugins/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def set_immutable(self, set_flag, path):
"""
chflags.set_immutable(path, set_flag)

@private
def is_cluster_path(self, path):
return path.startswith(FuseConfig.FUSE_PATH_SUBST.value)

@private
def resolve_cluster_path(self, path, ignore_ctdb=False):
"""
Expand Down
3 changes: 1 addition & 2 deletions src/middlewared/middlewared/plugins/filesystem_/acl_linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from pathlib import Path

from middlewared.service import private, CallError, ValidationErrors, Service
from middlewared.plugins.cluster_linux.utils import FuseConfig
from .acl_base import ACLBase, ACLType


Expand All @@ -32,7 +31,7 @@ def acltool(self, path, action, uid, gid, options):
raise CallError(f"acltool [{action}] on path {path} failed with error: [{acltool.stderr.decode().strip()}]")

def _common_perm_path_validate(self, schema, data, verrors):
is_cluster = data['path'].startswith(FuseConfig.FUSE_PATH_SUBST.value)
is_cluster = self.middleware.call_sync('filesystem.is_cluster_path', data['path'])
try:
data['path'] = self.middleware.call_sync('filesystem.resolve_cluster_path', data['path'])
except CallError as e:
Expand Down
16 changes: 7 additions & 9 deletions src/middlewared/middlewared/plugins/nfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,15 +431,13 @@ async def validate(self, data, schema_name, verrors, old=None):
verrors.add(f"{schema_name}.alldirs", "This option can only be used for shares that contain single path")

# if any of the `paths` that were passed to us by user are within the gluster volume
# mountpoint then we need to pass the `gluster_bypass` kwarg so that we don't raise a
# validation error complaining about using a gluster path within the zpool mountpoint
bypass = any('.glusterfs' in i for i in data["paths"] + data["aliases"])

# need to make sure that the nfs share is within the zpool mountpoint
for idx, i in enumerate(data["paths"]):
await check_path_resides_within_volume(
verrors, self.middleware, f'{schema_name}.paths.{idx}', i, gluster_bypass=bypass
)
# then we don't need to check whether the path is within the zpool mountpoint
if not any('.glusterfs' in i for i in data["paths"] + data["aliases"]):
# need to make sure that the nfs share is within the zpool mountpoint
for idx, i in enumerate(data["paths"]):
await check_path_resides_within_volume(
verrors, self.middleware, f'{schema_name}.paths.{idx}', i,
)

await self.middleware.run_in_thread(self.validate_paths, data, schema_name, verrors)

Expand Down
5 changes: 2 additions & 3 deletions src/middlewared/middlewared/plugins/smb.py
Original file line number Diff line number Diff line change
Expand Up @@ -1458,12 +1458,11 @@ async def validate(self, data, schema_name, verrors, old=None):
verrors.add(f'{schema_name}.home',
'Only one share is allowed to be a home share.')

bypass = bool(data['cluster_volname'])

await self.cluster_share_validate(data, schema_name, verrors)

if data['path']:
await self.validate_path_field(data, schema_name, verrors, bypass=bypass)
if not data['cluster_volname']:
await self.validate_path_field(data, schema_name, verrors)

"""
When path is not a clustervolname, legacy behavior is to make all path components
Expand Down
14 changes: 10 additions & 4 deletions src/middlewared/middlewared/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1078,10 +1078,16 @@ async def sharing_task_extend_context(self, rows, extra):
}

@private
async def validate_path_field(self, data, schema, verrors, bypass=False):
await check_path_resides_within_volume(
verrors, self.middleware, f'{schema}.{self.path_field}', data.get(self.path_field), gluster_bypass=bypass,
)
async def validate_path_field(self, data, schema, verrors, *, allow_cluster=False):
name = f'{schema}.{self.path_field}'
path = data[self.path_field]

if await self.middleware.call('filesystem.is_cluster_path', path):
if not allow_cluster:
verrors.add(name, 'Cluster path is not allowed')
else:
await check_path_resides_within_volume(verrors, self.middleware, name, path)

return verrors

@private
Expand Down
90 changes: 90 additions & 0 deletions src/middlewared/middlewared/test/integration/assets/cloudsync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# -*- coding=utf-8 -*-
import contextlib
import logging

from middlewared.test.integration.assets.pool import dataset
from middlewared.test.integration.assets.s3 import s3_server
from middlewared.test.integration.utils import call

logger = logging.getLogger(__name__)

__all__ = ["credential", "task", "local_s3_credential", "local_s3_task", "run_task"]


@contextlib.contextmanager
def credential(data):
data = {
"name": "Test",
**data,
}

credential = call("cloudsync.credentials.create", data)
try:
yield credential
finally:
call("cloudsync.credentials.delete", credential["id"])


@contextlib.contextmanager
def task(data):
data = {
"description": "Test",
"schedule": {
"minute": "00",
"hour": "00",
"dom": "1",
"month": "1",
"dow": "1",
},
**data
}

task = call("cloudsync.create", data)
try:
yield task
finally:
call("cloudsync.delete", task["id"])


@contextlib.contextmanager
def local_s3_credential(credential_params=None):
credential_params = credential_params or {}

with dataset("cloudsync_remote") as remote_dataset:
with s3_server(remote_dataset) as s3:
with credential({
"provider": "S3",
"attributes": {
"access_key_id": s3.access_key,
"secret_access_key": s3.secret_key,
"endpoint": "http://localhost:9000",
"skip_region": True,
**credential_params,
},
}) as c:
yield c


@contextlib.contextmanager
def local_s3_task(params=None, credential_params=None):
params = params or {}
credential_params = credential_params or {}

with dataset("cloudsync_local") as local_dataset:
with local_s3_credential(credential_params) as c:
with task({
"direction": "PUSH",
"transfer_mode": "COPY",
"path": f"/mnt/{local_dataset}",
"credentials": c["id"],
"attributes": {
"bucket": "bucket",
"folder": "",
},
**params,
}) as t:
yield t


def run_task(task, timeout=120):
call("cloudsync.sync", task["id"], job=True, timeout=timeout)
11 changes: 9 additions & 2 deletions src/middlewared/middlewared/test/integration/utils/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@

@contextlib.contextmanager
def client():
with Client(f"ws://{os.environ['MIDDLEWARE_TEST_IP']}/websocket", py_exceptions=True) as c:
c.call("auth.login", "root", os.environ["MIDDLEWARE_TEST_PASSWORD"])
if "NODE_A_IP" in os.environ:
host = os.environ["NODE_A_IP"]
password = os.environ["APIPASS"]
else:
host = os.environ["MIDDLEWARE_TEST_IP"]
password = os.environ["MIDDLEWARE_TEST_PASSWORD"]

with Client(f"ws://{host}/websocket", py_exceptions=True) as c:
c.call("auth.login", "root", password)
yield c
2 changes: 1 addition & 1 deletion src/middlewared/middlewared/test/integration/utils/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
sys.path.append(apifolder)
from auto_config import pool_name
except ImportError:
pool_name = None
pool_name = os.environ["ZPOOL"]

logger = logging.getLogger(__name__)

Expand Down
Loading

0 comments on commit f6198f7

Please sign in to comment.