Skip to content

Commit

Permalink
MDB-21949 Use tmp folder for restore/backup access control entities
Browse files Browse the repository at this point in the history
  • Loading branch information
MikhailBurdukov committed May 24, 2024
1 parent 13f7d82 commit ce6992a
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 85 deletions.
14 changes: 7 additions & 7 deletions ch_backup/backup/layout.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,18 @@ def upload_access_control_file(self, backup_name: str, file_name: str) -> None:
raise StorageError(msg) from e

def upload_access_control_files(
self, backup_name: str, file_names: List[str]
self, path: str, backup_name: str, local_path: str, file_names: List[str]
) -> None:
"""
Upload access control list.
"""
local_path = self._access_control_path
remote_path = _access_control_data_path(
self.get_backup_path(backup_name), ACCESS_CONTROL_FNAME
)
try:
logging.debug('Uploading access control data "{}"', local_path)
self._storage_loader.upload_files_tarball(
self._access_control_path,
path,
remote_path,
files=file_names,
encryption=True,
Expand Down Expand Up @@ -368,14 +367,16 @@ def get_table_create_statement(
remote_path = _table_metadata_path(backup_meta.path, db_name, table_name)
return self._storage_loader.download_data(remote_path, encryption=True)

def download_access_control_file(self, backup_name: str, file_name: str) -> None:
def download_access_control_file(
self, local_path: str, backup_name: str, file_name: str
) -> None:
"""
Download access control object metadata and save on disk.
"""
remote_path = _access_control_data_path(
self.get_backup_path(backup_name), file_name
)
local_path = os.path.join(self._access_control_path, file_name)
local_path = os.path.join(local_path, file_name)
logging.debug(
'Downloading access control metadata "{}" to "{}', remote_path, local_path
)
Expand All @@ -385,14 +386,13 @@ def download_access_control_file(self, backup_name: str, file_name: str) -> None
msg = f"Failed to download access control metadata file {remote_path}"
raise StorageError(msg) from e

def download_access_control(self, backup_name: str) -> None:
def download_access_control(self, local_path: str, backup_name: str) -> None:
"""
Download access control object metadata and save on disk.
"""
remote_path = _access_control_data_path(
self.get_backup_path(backup_name), ACCESS_CONTROL_FNAME
)
local_path = self._access_control_path
logging.debug(
'Downloading access control metadata "{}" to "{}', remote_path, local_path
)
Expand Down
1 change: 1 addition & 0 deletions ch_backup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def _as_seconds(t: str) -> int:
"data_path": "/var/lib/clickhouse",
"metadata_path": "/var/lib/clickhouse/metadata",
"access_control_path": "/var/lib/clickhouse/access",
"tmp_path": "/var/lib/clickhouse/_mdb_tmp",
"zk_access_control_path": "/clickhouse/access",
"config_dir": "/etc/clickhouse-server/config.d/",
"preprocessed_config_path": "/var/lib/clickhouse/preprocessed_configs/config.xml",
Expand Down
163 changes: 93 additions & 70 deletions ch_backup/logic/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import os
import re
import shutil
from typing import Any, Dict, List, Sequence, Union

from kazoo.client import KazooClient
Expand All @@ -14,7 +13,13 @@
from ch_backup.backup.metadata import BackupStorageFormat
from ch_backup.backup_context import BackupContext
from ch_backup.logic.backup_manager import BackupManager
from ch_backup.util import chown_dir_contents
from ch_backup.util import (
chown_dir_contents,
chown_path,
copy_directory_content,
create_dir,
temporary_directory,
)

CH_MARK_FILE = "need_rebuild_lists.mark"

Expand All @@ -28,13 +33,39 @@ def backup(self, context: BackupContext) -> None:
"""
Backup access control entities.
"""
objects = context.ch_ctl.get_access_control_objects()
context.backup_meta.set_access_control(objects)
access_control = context.backup_meta.access_control

if self._has_replicated_access(context):
self._backup_replicated(access_control.acl_ids, context)
self._backup_local(access_control.acl_ids, context)
clickhouse_access_path = context.ch_ctl_conf["access_control_path"]
backup_tmp_path = os.path.join(
context.ch_ctl_conf["tmp_path"], context.backup_meta.name
)
user = context.ch_ctl_conf["user"]
group = context.ch_ctl_conf["group"]

create_dir(clickhouse_access_path, user, group)
with temporary_directory(backup_tmp_path, user, group):
objects = context.ch_ctl.get_access_control_objects()
context.backup_meta.set_access_control(objects)
access_control = context.backup_meta.access_control

if self._has_replicated_access(context):
self._backup_replicated(
backup_tmp_path, access_control.acl_ids, context
)
self._backup_local(clickhouse_access_path, backup_tmp_path)

assert (
context.backup_meta.access_control.backup_format
== BackupStorageFormat.TAR
)
chown_dir_contents(user, group, backup_tmp_path)

acl_file_names = _get_access_control_files(access_control.acl_ids)
context.backup_layout.upload_access_control_files(
backup_tmp_path,
context.backup_meta.name,
backup_tmp_path,
acl_file_names,
)

def restore(self, context: BackupContext) -> None:
"""
Expand All @@ -48,11 +79,35 @@ def restore(self, context: BackupContext) -> None:
return

has_replicated_access = self._has_replicated_access(context)
mark_to_rebuild = not has_replicated_access

self._restore_local(acl_ids, context, mark_to_rebuild=mark_to_rebuild)
if has_replicated_access:
self._restore_replicated(acl_ids, acl_meta, context)
clickhouse_access_path = context.ch_ctl_conf["access_control_path"]
restore_tmp_path = os.path.join(
context.ch_ctl_conf["tmp_path"], context.backup_meta.name
)
user = context.ch_ctl_conf["user"]
group = context.ch_ctl_conf["group"]

create_dir(clickhouse_access_path, user, group)
with temporary_directory(restore_tmp_path, user, group):
if (
context.backup_meta.access_control.backup_format
== BackupStorageFormat.TAR
):
context.backup_layout.download_access_control(
restore_tmp_path, context.backup_meta.name
)
else:
for name in _get_access_control_files(acl_ids):
context.backup_layout.download_access_control_file(
restore_tmp_path, context.backup_meta.name, name
)

if has_replicated_access:
self._restore_replicated(restore_tmp_path, acl_ids, acl_meta, context)
else:
self._restore_local(
restore_tmp_path, clickhouse_access_path, user, group
)

def fix_admin_user(self, context: BackupContext, dry_run: bool = True) -> None:
"""
Expand Down Expand Up @@ -109,7 +164,7 @@ def fix_admin_user(self, context: BackupContext, dry_run: bool = True) -> None:
logging.debug(f"Node {zk_path} not found.")

# cleanup SQL file
file_path = _get_access_file_path(context, f"{uuid}.sql")
file_path = os.path.join(context.ch_ctl_conf["access_control_path"], f"{uuid}.sql")
logging.debug(f"Removing file {file_path}")
try:
if dry_run:
Expand All @@ -124,62 +179,47 @@ def fix_admin_user(self, context: BackupContext, dry_run: bool = True) -> None:
def _clean_user_uuid(self, raw_str: str) -> str:
return re.sub(r"EXCEPT ID\('(.+)'\)", "", raw_str)

def _backup_local(self, acl_ids: Sequence[str], context: BackupContext) -> None:
def _backup_local(self, clickhouse_access_path: str, backup_tmp_path: str) -> None:
"""
Backup access entities from local storage.
Backup access entities from local storage to temporary folder.
"""
assert (
context.backup_meta.access_control.backup_format == BackupStorageFormat.TAR
)

logging.debug(f"Backupping {len(acl_ids)} access entities from local storage")
acl_file_names = _get_access_control_files(acl_ids)
context.backup_layout.upload_access_control_files(
context.backup_meta.name, acl_file_names
logging.debug(
"Backupping access entities from local storage to {}", backup_tmp_path
)
copy_directory_content(clickhouse_access_path, backup_tmp_path)

def _backup_replicated(
self, acl_list: Sequence[str], context: BackupContext
self, backup_tmp_path: str, acl_list: Sequence[str], context: BackupContext
) -> None:
"""
Backup access entities from replicated storage (ZK/CK).
"""
_ensure_access_control_path(context)
logging.debug(
f"Backupping {len(acl_list)} access entities from replicated storage"
)
with context.zk_ctl.zk_client as zk_client:
for uuid in acl_list:
uuid_zk_path = _get_access_zk_path(context, f"/uuid/{uuid}")
data, _ = zk_client.get(uuid_zk_path)
_file_create(context, f"{uuid}.sql", data.decode())
_chown_access_control_dir(context)
_create_access_file(backup_tmp_path, f"{uuid}.sql", data.decode())

def _restore_local(
self,
acl_list: Sequence[str],
context: BackupContext,
mark_to_rebuild: bool = True,
restore_tmp_path: str,
clickhouse_access_path: str,
user: str,
group: str,
) -> None:
"""
Restore access entities to local storage.
"""
_ensure_access_control_path(context)
logging.debug(f"Restoring {len(acl_list)} access entities to local storage")

if context.backup_meta.access_control.backup_format == BackupStorageFormat.TAR:
context.backup_layout.download_access_control(context.backup_meta.name)
else:
for name in _get_access_control_files(acl_list):
context.backup_layout.download_access_control_file(
context.backup_meta.name, name
)

if mark_to_rebuild:
self._mark_to_rebuild(context)
copy_directory_content(restore_tmp_path, clickhouse_access_path)
self._mark_to_rebuild(clickhouse_access_path, user, group)

def _restore_replicated(
self,
restore_tmp_path: str,
acl_list: Sequence[str],
acl_meta: Dict[str, Dict[str, Any]],
context: BackupContext,
Expand All @@ -202,7 +242,7 @@ def _restore_replicated(
name, obj_char = meta_data["name"], meta_data["char"]

# restore object data
file_path = _get_access_file_path(context, f"{uuid}.sql")
file_path = os.path.join(restore_tmp_path, f"{uuid}.sql")
with open(file_path, "r", encoding="utf-8") as file:
data = file.read()
uuid_zk_path = _get_access_zk_path(context, f"/uuid/{uuid}")
Expand All @@ -212,14 +252,16 @@ def _restore_replicated(
uuid_zk_path = _get_access_zk_path(context, f"/{obj_char}/{name}")
_zk_upsert_data(zk_client, uuid_zk_path, uuid)

def _mark_to_rebuild(self, context: BackupContext) -> None:
def _mark_to_rebuild(
self, clickhouse_access_path: str, user: str, group: str
) -> None:
"""
Creates special mark file to rebuild the lists.
"""
mark_file = _get_access_file_path(context, CH_MARK_FILE)
mark_file = os.path.join(clickhouse_access_path, CH_MARK_FILE)
with open(mark_file, "a", encoding="utf-8"):
pass
_chown_path(context, mark_file)
chown_path(mark_file, user, group)

def _has_replicated_access(self, context: BackupContext) -> bool:
return (
Expand All @@ -232,11 +274,8 @@ def _get_access_control_files(objects: Sequence[str]) -> List[str]:
"""
Return list of file to be backuped/restored.
"""
return [f"{obj}.sql" for obj in objects]


def _get_access_file_path(context: BackupContext, file_name: str) -> str:
return os.path.join(context.ch_ctl_conf["access_control_path"], file_name)
return [f"{obj}.sql" for obj in objects]


def _get_access_zk_path(context: BackupContext, zk_path: str) -> str:
Expand All @@ -259,28 +298,12 @@ def _zk_upsert_data(zk: KazooClient, path: str, value: Union[str, bytes]) -> Non
zk.create(path, value, makepath=True)


def _file_create(context: BackupContext, file_name: str, file_content: str = "") -> str:
file_path = _get_access_file_path(context, file_name)
def _create_access_file(
backup_tmp_path: str, file_name: str, file_content: str = ""
) -> str:
file_path = os.path.join(backup_tmp_path, file_name)
logging.debug(f'Creating "{file_path}" access entity file')
with open(file_path, "w", encoding="utf-8") as file:
file.write(file_content)

return file_path


def _ensure_access_control_path(context: BackupContext) -> None:
acl_path = context.ch_ctl_conf["access_control_path"]
os.makedirs(acl_path, exist_ok=True)
_chown_path(context, acl_path)


def _chown_access_control_dir(context: BackupContext) -> None:
ch_config = context.ch_ctl_conf
chown_dir_contents(
ch_config["user"], ch_config["group"], ch_config["access_control_path"]
)


def _chown_path(context: BackupContext, path: str) -> None:
ch_config = context.ch_ctl_conf
shutil.chown(path, ch_config["user"], ch_config["group"])
48 changes: 48 additions & 0 deletions ch_backup/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,3 +444,51 @@ def replace_macros(string: str, macros: dict) -> str:
pattern=r"{([^{}]+)}",
repl=lambda m: macros.get(m.group(1), m.group(0)),
)


def chown_path(path: str, user: str, group: str) -> None:
"""
Interface for chown
"""
shutil.chown(path, user, group)


def copy_directory_content(from_path_dir: str, to_path_dir: str) -> None:
"""
Copy all files from one directory to another.
"""
if to_path_dir[-1] != "/":
to_path_dir += "/"
for subpath in os.listdir(from_path_dir):
subpath = os.path.join(from_path_dir, subpath)
shutil.copy(subpath, to_path_dir)


def create_dir(path: str, user: str, group: str) -> None:
"""
Create dir and chown it.
"""
os.makedirs(path, exist_ok=True)
chown_path(path, user, group)


class temporary_directory:
"""
Class to automatically create and remove temporary directory.
In case of exception, do not remove the folder.
"""

def __init__(self, path: str, user: str, group: str):
self._path = path
self._user = user
self._group = group

def __enter__(self) -> None:
create_dir(self._path, self._user, self._group)

def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None:
logging.warning("Dont remove tmp dir {} due to exception.", self._path)
return
shutil.rmtree(self._path)
Loading

0 comments on commit ce6992a

Please sign in to comment.