Skip to content

Commit

Permalink
Allow CLUSTER: locations to be used for cloud sync tasks (#8509)
Browse files Browse the repository at this point in the history
(cherry picked from commit 3c06a99)
  • Loading branch information
themylogin committed May 3, 2022
1 parent 1524b48 commit a6a86e9
Show file tree
Hide file tree
Showing 12 changed files with 114 additions and 50 deletions.
Empty file.
56 changes: 56 additions & 0 deletions cluster-tests/tests/cloudsync/test_cloudsync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import pytest

from middlewared.client import ClientException
from middlewared.test.integration.assets.cloud_sync import *

from config import CLUSTER_INFO, CLUSTER_IPS
from utils import make_request, ssh_test, wait_on_job

LOCAL_PATH = f'/cluster/{CLUSTER_INFO["GLUSTER_VOLUME"]}/cloudsync_01'
CLUSTER_PATH = f'CLUSTER:{CLUSTER_INFO["GLUSTER_VOLUME"]}/cloudsync_01'


def test_works():
res = ssh_test(CLUSTER_IPS[0], f'mkdir {LOCAL_PATH}')
assert res['result'], res['stderr']
try:
res = ssh_test(CLUSTER_IPS[0], f'echo test > {LOCAL_PATH}/file01')
assert res['result'], res['stderr']

try:
with local_s3_task({
"path": CLUSTER_PATH,
}) as task:
run_task(task)

res = ssh_test(CLUSTER_IPS[0], f'cat {LOCAL_PATH}/file01')
assert res['result'], res['stderr']

assert res['output'] == 'test\n'
finally:
res = ssh_test(CLUSTER_IPS[0], f'rm -rf {LOCAL_PATH}')
assert res['result'], res['stderr']
finally:
res = ssh_test(CLUSTER_IPS[0], f'rm -rf {LOCAL_PATH}')
assert res['result'], res['stderr']


def test_invalid_cluster_path():
with pytest.raises(ClientException) as e:
with local_s3_task({
"path": CLUSTER_PATH,
}) as task:
run_task(task)

assert str(e.value) == f"[EFAULT] Directory '{CLUSTER_PATH}' does not exist"


def test_cluster_path_snapshot():
with pytest.raises(ClientException) as e:
with local_s3_task({
"path": CLUSTER_PATH,
"snapshot": True
}):
pass

assert str(e.value) == f"[EINVAL] cloud_sync_create.snapshot: This option can not be used for cluster paths"
6 changes: 1 addition & 5 deletions src/middlewared/middlewared/async_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@
from middlewared.validators import IpAddress


async def check_path_resides_within_volume(verrors, middleware, name, path, gluster_bypass=False):

# when a sharing service is using gluster, the path checks below do not apply
if gluster_bypass:
return
async def check_path_resides_within_volume(verrors, middleware, name, path):

# we need to make sure the sharing service is configured within the zpool
rp = os.path.realpath(path)
Expand Down
36 changes: 25 additions & 11 deletions src/middlewared/middlewared/plugins/cloud_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,34 @@ def get_remote_path(provider, attributes):
return remote_path


def check_local_path(path):
def check_local_path(path, *, check_mountpoint=True, error_text_path=None):
error_text_path = error_text_path or path

if not os.path.exists(path):
raise CallError(f"Directory {path!r} does not exist")
raise CallError(f"Directory {error_text_path!r} does not exist")

if not os.path.isdir(path):
raise CallError(f"{path!r} is not a directory")
raise CallError(f"{error_text_path!r} is not a directory")

if not os.path.normpath(path).startswith("/mnt/") or os.stat(path).st_dev == os.stat("/mnt").st_dev:
raise CallError(f"Directory {path!r} must reside within volume mount point")
if check_mountpoint:
if not os.path.normpath(path).startswith("/mnt/") or os.stat(path).st_dev == os.stat("/mnt").st_dev:
raise CallError(f"Directory {error_text_path!r} must reside within volume mount point")


async def rclone(middleware, job, cloud_sync, dry_run):
await middleware.call("network.general.will_perform_activity", "cloud_sync")

await middleware.run_in_thread(check_local_path, cloud_sync["path"])
if await middleware.call("filesystem.is_cluster_path", cloud_sync["path"]):
path = await middleware.call("filesystem.resolve_cluster_path", cloud_sync["path"])
await middleware.run_in_thread(
check_local_path,
path,
check_mountpoint=False,
error_text_path=cloud_sync["path"],
)
else:
path = cloud_sync["path"]
await middleware.run_in_thread(check_local_path, path)

# Use a temporary file to store rclone file
async with RcloneConfig(cloud_sync) as config:
Expand Down Expand Up @@ -189,7 +202,6 @@ async def rclone(middleware, job, cloud_sync, dry_run):
args += [cloud_sync["transfer_mode"].lower()]

snapshot = None
path = cloud_sync["path"]
if cloud_sync["direction"] == "PUSH":
if cloud_sync["snapshot"]:
dataset, recursive = get_dataset_recursive(
Expand Down Expand Up @@ -830,16 +842,18 @@ async def _validate(self, verrors, name, data):
if limit1["time"] >= limit2["time"]:
verrors.add(f"{name}.bwlimit.{i + 1}.time", f"Invalid time order: {limit1['time']}, {limit2['time']}")

await self.validate_path_field(data, name, verrors)
await self.validate_path_field(data, name, verrors, allow_cluster=True)

if data["snapshot"]:
if data["direction"] != "PUSH":
verrors.add(f"{name}.snapshot", "This option can only be enabled for PUSH tasks")
if data["transfer_mode"] == "MOVE":
verrors.add(f"{name}.snapshot", "This option can not be used for MOVE transfer mode")
if await self.middleware.call("pool.dataset.query",
[["name", "^", os.path.relpath(data["path"], "/mnt") + "/"],
["type", "=", "FILESYSTEM"]]):
if await self.middleware.call("filesystem.is_cluster_path", data["path"]):
verrors.add(f"{name}.snapshot", "This option can not be used for cluster paths")
elif await self.middleware.call("pool.dataset.query",
[["name", "^", os.path.relpath(data["path"], "/mnt") + "/"],
["type", "=", "FILESYSTEM"]]):
verrors.add(f"{name}.snapshot", "This option is only available for datasets that have no further "
"nesting")

Expand Down
4 changes: 4 additions & 0 deletions src/middlewared/middlewared/plugins/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ def set_immutable(self, set_flag, path):
"""
chflags.set_immutable(path, set_flag)

@private
def is_cluster_path(self, path):
return path.startswith(FuseConfig.FUSE_PATH_SUBST.value)

@private
def resolve_cluster_path(self, path, ignore_ctdb=False):
"""
Expand Down
3 changes: 1 addition & 2 deletions src/middlewared/middlewared/plugins/filesystem_/acl_linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from pathlib import Path

from middlewared.service import private, CallError, ValidationErrors, Service
from middlewared.plugins.cluster_linux.utils import FuseConfig
from .acl_base import ACLBase, ACLType


Expand All @@ -32,7 +31,7 @@ def acltool(self, path, action, uid, gid, options):
raise CallError(f"acltool [{action}] on path {path} failed with error: [{acltool.stderr.decode().strip()}]")

def _common_perm_path_validate(self, schema, data, verrors):
is_cluster = data['path'].startswith(FuseConfig.FUSE_PATH_SUBST.value)
is_cluster = self.middleware.call_sync('filesystem.is_cluster_path', data['path'])
try:
data['path'] = self.middleware.call_sync('filesystem.resolve_cluster_path', data['path'])
except CallError as e:
Expand Down
5 changes: 2 additions & 3 deletions src/middlewared/middlewared/plugins/smb.py
Original file line number Diff line number Diff line change
Expand Up @@ -1439,12 +1439,11 @@ async def validate(self, data, schema_name, verrors, old=None):
verrors.add(f'{schema_name}.home',
'Only one share is allowed to be a home share.')

bypass = bool(data['cluster_volname'])

await self.cluster_share_validate(data, schema_name, verrors)

if data['path']:
await self.validate_path_field(data, schema_name, verrors, bypass=bypass)
if not data['cluster_volname']:
await self.validate_path_field(data, schema_name, verrors)

"""
When path is not a clustervolname, legacy behavior is to make all path components
Expand Down
14 changes: 10 additions & 4 deletions src/middlewared/middlewared/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1091,10 +1091,16 @@ async def sharing_task_extend_context(self, rows, extra):
}

@private
async def validate_path_field(self, data, schema, verrors, bypass=False):
await check_path_resides_within_volume(
verrors, self.middleware, f'{schema}.{self.path_field}', data.get(self.path_field), gluster_bypass=bypass,
)
async def validate_path_field(self, data, schema, verrors, *, allow_cluster=False):
name = f'{schema}.{self.path_field}'
path = data[self.path_field]

if await self.middleware.call('filesystem.is_cluster_path', path):
if not allow_cluster:
verrors.add(name, 'Cluster path is not allowed')
else:
await check_path_resides_within_volume(verrors, self.middleware, name, path)

return verrors

@private
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,7 @@ def local_s3_task(params=None, credential_params=None):
**params,
}) as t:
yield t


def run_task(task, timeout=120):
call("cloudsync.sync", task["id"], job=True, timeout=timeout)
11 changes: 9 additions & 2 deletions src/middlewared/middlewared/test/integration/utils/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,15 @@

@contextlib.contextmanager
def client(py_exceptions=True):
with Client(f"ws://{os.environ['MIDDLEWARE_TEST_IP']}/websocket", py_exceptions=py_exceptions) as c:
c.call("auth.login", "root", os.environ["MIDDLEWARE_TEST_PASSWORD"])
if "NODE_A_IP" in os.environ:
host = os.environ["NODE_A_IP"]
password = os.environ["APIPASS"]
else:
host = os.environ["MIDDLEWARE_TEST_IP"]
password = os.environ["MIDDLEWARE_TEST_PASSWORD"]

with Client(f"ws://{host}/websocket", py_exceptions=py_exceptions) as c:
c.call("auth.login", "root", password)
yield c


Expand Down
2 changes: 1 addition & 1 deletion src/middlewared/middlewared/test/integration/utils/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
sys.path.append(apifolder)
from auto_config import pool_name
except ImportError:
pool_name = None
pool_name = os.environ["ZPOOL"]

logger = logging.getLogger(__name__)

Expand Down
23 changes: 1 addition & 22 deletions tests/api2/test_cloud_sync.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import contextlib
import re
import time

import pytest

from middlewared.test.integration.assets.cloud_sync import credential, task, local_s3_credential, local_s3_task
from middlewared.test.integration.assets.cloud_sync import credential, task, local_s3_credential, local_s3_task, run_task
from middlewared.test.integration.assets.ftp import anonymous_ftp_server, ftp_server_with_user_account
from middlewared.test.integration.assets.pool import dataset
from middlewared.test.integration.utils import call, pool, ssh
Expand All @@ -13,32 +12,12 @@
import os
apifolder = os.getcwd()
sys.path.append(apifolder)
from functions import PUT, POST, GET, DELETE
from auto_config import dev_test
reason = 'Skip for testing'
# comment pytestmark for development testing with --dev-test
pytestmark = pytest.mark.skipif(dev_test, reason=reason)


def run_task(task):
result = POST(f"/cloudsync/id/{task['id']}/sync/")
assert result.status_code == 200, result.text
for i in range(120):
result = GET(f"/cloudsync/id/{task['id']}/")
assert result.status_code == 200, result.text
state = result.json()
if state["job"] is None:
time.sleep(1)
continue
if state["job"]["state"] in ["WAITING", "RUNNING"]:
time.sleep(1)
continue
assert state["job"]["state"] == "SUCCESS", state
return

assert False, state


def test_include():
with local_s3_task({
"include": ["/office/**", "/work/**"],
Expand Down

0 comments on commit a6a86e9

Please sign in to comment.