diff --git a/ch_backup/backup/layout.py b/ch_backup/backup/layout.py index 81f5c755..30ea387c 100644 --- a/ch_backup/backup/layout.py +++ b/ch_backup/backup/layout.py @@ -120,19 +120,18 @@ def upload_access_control_file(self, backup_name: str, file_name: str) -> None: raise StorageError(msg) from e def upload_access_control_files( - self, backup_name: str, file_names: List[str] + self, path: str, backup_name: str, local_path: str, file_names: List[str] ) -> None: """ Upload access control list. """ - local_path = self._access_control_path remote_path = _access_control_data_path( self.get_backup_path(backup_name), ACCESS_CONTROL_FNAME ) try: logging.debug('Uploading access control data "{}"', local_path) self._storage_loader.upload_files_tarball( - self._access_control_path, + path, remote_path, files=file_names, encryption=True, @@ -368,14 +367,16 @@ def get_table_create_statement( remote_path = _table_metadata_path(backup_meta.path, db_name, table_name) return self._storage_loader.download_data(remote_path, encryption=True) - def download_access_control_file(self, backup_name: str, file_name: str) -> None: + def download_access_control_file( + self, local_path: str, backup_name: str, file_name: str + ) -> None: """ Download access control object metadata and save on disk. """ remote_path = _access_control_data_path( self.get_backup_path(backup_name), file_name ) - local_path = os.path.join(self._access_control_path, file_name) + local_path = os.path.join(local_path, file_name) logging.debug( 'Downloading access control metadata "{}" to "{}', remote_path, local_path ) @@ -385,14 +386,13 @@ def download_access_control_file(self, backup_name: str, file_name: str) -> None msg = f"Failed to download access control metadata file {remote_path}" raise StorageError(msg) from e - def download_access_control(self, backup_name: str) -> None: + def download_access_control(self, local_path: str, backup_name: str) -> None: """ Download access control object metadata and save on disk. """ remote_path = _access_control_data_path( self.get_backup_path(backup_name), ACCESS_CONTROL_FNAME ) - local_path = self._access_control_path logging.debug( 'Downloading access control metadata "{}" to "{}', remote_path, local_path ) diff --git a/ch_backup/config.py b/ch_backup/config.py index a2782657..3eda1f2d 100644 --- a/ch_backup/config.py +++ b/ch_backup/config.py @@ -20,6 +20,7 @@ def _as_seconds(t: str) -> int: "data_path": "/var/lib/clickhouse", "metadata_path": "/var/lib/clickhouse/metadata", "access_control_path": "/var/lib/clickhouse/access", + "tmp_path": "/var/lib/clickhouse/_mdb_tmp", "zk_access_control_path": "/clickhouse/access", "config_dir": "/etc/clickhouse-server/config.d/", "preprocessed_config_path": "/var/lib/clickhouse/preprocessed_configs/config.xml", diff --git a/ch_backup/logic/access.py b/ch_backup/logic/access.py index 7d3f1fc5..e2bcfd54 100644 --- a/ch_backup/logic/access.py +++ b/ch_backup/logic/access.py @@ -4,7 +4,6 @@ import os import re -import shutil from typing import Any, Dict, List, Sequence, Union from kazoo.client import KazooClient @@ -14,7 +13,13 @@ from ch_backup.backup.metadata import BackupStorageFormat from ch_backup.backup_context import BackupContext from ch_backup.logic.backup_manager import BackupManager -from ch_backup.util import chown_dir_contents +from ch_backup.util import ( + chown_dir_contents, + chown_path, + copy_directory_content, + create_dir, + temporary_directory, +) CH_MARK_FILE = "need_rebuild_lists.mark" @@ -28,13 +33,39 @@ def backup(self, context: BackupContext) -> None: """ Backup access control entities. """ - objects = context.ch_ctl.get_access_control_objects() - context.backup_meta.set_access_control(objects) - access_control = context.backup_meta.access_control - if self._has_replicated_access(context): - self._backup_replicated(access_control.acl_ids, context) - self._backup_local(access_control.acl_ids, context) + clickhouse_access_path = context.ch_ctl_conf["access_control_path"] + backup_tmp_path = os.path.join( + context.ch_ctl_conf["tmp_path"], context.backup_meta.name + ) + user = context.ch_ctl_conf["user"] + group = context.ch_ctl_conf["group"] + + create_dir(clickhouse_access_path, user, group) + with temporary_directory(backup_tmp_path, user, group): + objects = context.ch_ctl.get_access_control_objects() + context.backup_meta.set_access_control(objects) + access_control = context.backup_meta.access_control + + if self._has_replicated_access(context): + self._backup_replicated( + backup_tmp_path, access_control.acl_ids, context + ) + self._backup_local(clickhouse_access_path, backup_tmp_path) + + assert ( + context.backup_meta.access_control.backup_format + == BackupStorageFormat.TAR + ) + chown_dir_contents(user, group, backup_tmp_path) + + acl_file_names = _get_access_control_files(access_control.acl_ids) + context.backup_layout.upload_access_control_files( + backup_tmp_path, + context.backup_meta.name, + backup_tmp_path, + acl_file_names, + ) def restore(self, context: BackupContext) -> None: """ @@ -48,11 +79,35 @@ def restore(self, context: BackupContext) -> None: return has_replicated_access = self._has_replicated_access(context) - mark_to_rebuild = not has_replicated_access - self._restore_local(acl_ids, context, mark_to_rebuild=mark_to_rebuild) - if has_replicated_access: - self._restore_replicated(acl_ids, acl_meta, context) + clickhouse_access_path = context.ch_ctl_conf["access_control_path"] + restore_tmp_path = os.path.join( + context.ch_ctl_conf["tmp_path"], context.backup_meta.name + ) + user = context.ch_ctl_conf["user"] + group = context.ch_ctl_conf["group"] + + create_dir(clickhouse_access_path, user, group) + with temporary_directory(restore_tmp_path, user, group): + if ( + context.backup_meta.access_control.backup_format + == BackupStorageFormat.TAR + ): + context.backup_layout.download_access_control( + restore_tmp_path, context.backup_meta.name + ) + else: + for name in _get_access_control_files(acl_ids): + context.backup_layout.download_access_control_file( + restore_tmp_path, context.backup_meta.name, name + ) + + if has_replicated_access: + self._restore_replicated(restore_tmp_path, acl_ids, acl_meta, context) + else: + self._restore_local( + restore_tmp_path, clickhouse_access_path, user, group + ) def fix_admin_user(self, context: BackupContext, dry_run: bool = True) -> None: """ @@ -109,7 +164,7 @@ def fix_admin_user(self, context: BackupContext, dry_run: bool = True) -> None: logging.debug(f"Node {zk_path} not found.") # cleanup SQL file - file_path = _get_access_file_path(context, f"{uuid}.sql") + file_path = os.path.join(context.ch_ctl_conf["access_control_path"], f"{uuid}.sql") logging.debug(f"Removing file {file_path}") try: if dry_run: @@ -124,27 +179,22 @@ def fix_admin_user(self, context: BackupContext, dry_run: bool = True) -> None: def _clean_user_uuid(self, raw_str: str) -> str: return re.sub(r"EXCEPT ID\('(.+)'\)", "", raw_str) - def _backup_local(self, acl_ids: Sequence[str], context: BackupContext) -> None: + def _backup_local(self, clickhouse_access_path: str, backup_tmp_path: str) -> None: """ - Backup access entities from local storage. + Backup access entities from local storage to temporary folder. """ - assert ( - context.backup_meta.access_control.backup_format == BackupStorageFormat.TAR - ) - logging.debug(f"Backupping {len(acl_ids)} access entities from local storage") - acl_file_names = _get_access_control_files(acl_ids) - context.backup_layout.upload_access_control_files( - context.backup_meta.name, acl_file_names + logging.debug( + "Backupping access entities from local storage to {}", backup_tmp_path ) + copy_directory_content(clickhouse_access_path, backup_tmp_path) def _backup_replicated( - self, acl_list: Sequence[str], context: BackupContext + self, backup_tmp_path: str, acl_list: Sequence[str], context: BackupContext ) -> None: """ Backup access entities from replicated storage (ZK/CK). """ - _ensure_access_control_path(context) logging.debug( f"Backupping {len(acl_list)} access entities from replicated storage" ) @@ -152,34 +202,24 @@ def _backup_replicated( for uuid in acl_list: uuid_zk_path = _get_access_zk_path(context, f"/uuid/{uuid}") data, _ = zk_client.get(uuid_zk_path) - _file_create(context, f"{uuid}.sql", data.decode()) - _chown_access_control_dir(context) + _create_access_file(backup_tmp_path, f"{uuid}.sql", data.decode()) def _restore_local( self, - acl_list: Sequence[str], - context: BackupContext, - mark_to_rebuild: bool = True, + restore_tmp_path: str, + clickhouse_access_path: str, + user: str, + group: str, ) -> None: """ Restore access entities to local storage. """ - _ensure_access_control_path(context) - logging.debug(f"Restoring {len(acl_list)} access entities to local storage") - - if context.backup_meta.access_control.backup_format == BackupStorageFormat.TAR: - context.backup_layout.download_access_control(context.backup_meta.name) - else: - for name in _get_access_control_files(acl_list): - context.backup_layout.download_access_control_file( - context.backup_meta.name, name - ) - - if mark_to_rebuild: - self._mark_to_rebuild(context) + copy_directory_content(restore_tmp_path, clickhouse_access_path) + self._mark_to_rebuild(clickhouse_access_path, user, group) def _restore_replicated( self, + restore_tmp_path: str, acl_list: Sequence[str], acl_meta: Dict[str, Dict[str, Any]], context: BackupContext, @@ -202,7 +242,7 @@ def _restore_replicated( name, obj_char = meta_data["name"], meta_data["char"] # restore object data - file_path = _get_access_file_path(context, f"{uuid}.sql") + file_path = os.path.join(restore_tmp_path, f"{uuid}.sql") with open(file_path, "r", encoding="utf-8") as file: data = file.read() uuid_zk_path = _get_access_zk_path(context, f"/uuid/{uuid}") @@ -212,14 +252,16 @@ def _restore_replicated( uuid_zk_path = _get_access_zk_path(context, f"/{obj_char}/{name}") _zk_upsert_data(zk_client, uuid_zk_path, uuid) - def _mark_to_rebuild(self, context: BackupContext) -> None: + def _mark_to_rebuild( + self, clickhouse_access_path: str, user: str, group: str + ) -> None: """ Creates special mark file to rebuild the lists. """ - mark_file = _get_access_file_path(context, CH_MARK_FILE) + mark_file = os.path.join(clickhouse_access_path, CH_MARK_FILE) with open(mark_file, "a", encoding="utf-8"): pass - _chown_path(context, mark_file) + chown_path(mark_file, user, group) def _has_replicated_access(self, context: BackupContext) -> bool: return ( @@ -232,11 +274,8 @@ def _get_access_control_files(objects: Sequence[str]) -> List[str]: """ Return list of file to be backuped/restored. """ - return [f"{obj}.sql" for obj in objects] - -def _get_access_file_path(context: BackupContext, file_name: str) -> str: - return os.path.join(context.ch_ctl_conf["access_control_path"], file_name) + return [f"{obj}.sql" for obj in objects] def _get_access_zk_path(context: BackupContext, zk_path: str) -> str: @@ -259,28 +298,12 @@ def _zk_upsert_data(zk: KazooClient, path: str, value: Union[str, bytes]) -> Non zk.create(path, value, makepath=True) -def _file_create(context: BackupContext, file_name: str, file_content: str = "") -> str: - file_path = _get_access_file_path(context, file_name) +def _create_access_file( + backup_tmp_path: str, file_name: str, file_content: str = "" +) -> str: + file_path = os.path.join(backup_tmp_path, file_name) logging.debug(f'Creating "{file_path}" access entity file') with open(file_path, "w", encoding="utf-8") as file: file.write(file_content) return file_path - - -def _ensure_access_control_path(context: BackupContext) -> None: - acl_path = context.ch_ctl_conf["access_control_path"] - os.makedirs(acl_path, exist_ok=True) - _chown_path(context, acl_path) - - -def _chown_access_control_dir(context: BackupContext) -> None: - ch_config = context.ch_ctl_conf - chown_dir_contents( - ch_config["user"], ch_config["group"], ch_config["access_control_path"] - ) - - -def _chown_path(context: BackupContext, path: str) -> None: - ch_config = context.ch_ctl_conf - shutil.chown(path, ch_config["user"], ch_config["group"]) diff --git a/ch_backup/util.py b/ch_backup/util.py index e60a871e..b89a35c6 100644 --- a/ch_backup/util.py +++ b/ch_backup/util.py @@ -444,3 +444,51 @@ def replace_macros(string: str, macros: dict) -> str: pattern=r"{([^{}]+)}", repl=lambda m: macros.get(m.group(1), m.group(0)), ) + + +def chown_path(path: str, user: str, group: str) -> None: + """ + Interface for chown + """ + shutil.chown(path, user, group) + + +def copy_directory_content(from_path_dir: str, to_path_dir: str) -> None: + """ + Copy all files from one directory to another. + """ + if to_path_dir[-1] != "/": + to_path_dir += "/" + for subpath in os.listdir(from_path_dir): + subpath = os.path.join(from_path_dir, subpath) + shutil.copy(subpath, to_path_dir) + + +def create_dir(path: str, user: str, group: str) -> None: + """ + Create dir and chown it. + """ + os.makedirs(path, exist_ok=True) + chown_path(path, user, group) + + +class temporary_directory: + """ + Class to automatically create and remove temporary directory. + + In case of exception, do not remove the folder. + """ + + def __init__(self, path: str, user: str, group: str): + self._path = path + self._user = user + self._group = group + + def __enter__(self) -> None: + create_dir(self._path, self._user, self._group) + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is not None: + logging.warning("Dont remove tmp dir {} due to exception.", self._path) + return + shutil.rmtree(self._path) diff --git a/tests/integration/features/access_control_replicated_backup.feature b/tests/integration/features/access_control_replicated_backup.feature index 93165c78..8de9bb8d 100644 --- a/tests/integration/features/access_control_replicated_backup.feature +++ b/tests/integration/features/access_control_replicated_backup.feature @@ -45,14 +45,7 @@ Feature: Backup and restore functionality of replicated access control entities Then we got the following backups on clickhouse01 | num | state | data_count | link_count | | 0 | created | 1 | 0 | - When we execute command on clickhouse01 - """ - find /var/lib/clickhouse/access -name "*.sql" | wc -l - """ - Then we get response - """ - 5 - """ + @require_version_22.3 Scenario: check ZK data after restore to replicated storage