Skip to content

Commit

Permalink
Merge branch 'main' into MDB-27689-deterministic_restore_and_refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
MikhailBurdukov authored Jul 12, 2024
2 parents 9bc3620 + 248e0ce commit 004d839
Show file tree
Hide file tree
Showing 16 changed files with 359 additions and 116 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ jobs:
clickhouse: "latest"
- python: "3.11"
clickhouse: "latest"
- python: "3.12"
clickhouse: "latest"
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v4
Expand Down
6 changes: 4 additions & 2 deletions ch_backup/backup_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def backup_layout(self) -> BackupLayout:
Getter backup_layout
"""
if not hasattr(self, "_backup_layout"):
self._backup_layout = BackupLayout(self._config)
self._backup_layout = BackupLayout(self._config_root)
return self._backup_layout

@backup_layout.setter
Expand Down Expand Up @@ -151,7 +151,7 @@ def restore_context(self) -> RestoreContext:
Getter restore_context
"""
if not hasattr(self, "_restore_context"):
self._restore_context = RestoreContext(self._config)
self._restore_context = RestoreContext(self.config)
return self._restore_context

@restore_context.setter
Expand Down Expand Up @@ -208,6 +208,8 @@ def ch_config(self) -> ClickhouseConfig:
"""
Getter ch_config
"""
if not hasattr(self, "_ch_config"):
self._ch_config = ClickhouseConfig(self._config_root)
return self._ch_config

@ch_config.setter
Expand Down
18 changes: 3 additions & 15 deletions ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,9 @@
collect_dedup_info,
collect_dedup_references_for_batch_backup_deletion,
)
from ch_backup.backup.layout import BackupLayout
from ch_backup.backup.metadata import BackupMetadata, BackupState, TableMetadata
from ch_backup.backup.restore_context import RestoreContext
from ch_backup.backup.sources import BackupSources
from ch_backup.backup_context import BackupContext
from ch_backup.clickhouse.config import ClickhouseConfig
from ch_backup.clickhouse.control import ClickhouseCTL
from ch_backup.clickhouse.models import Database
from ch_backup.config import Config
from ch_backup.exceptions import (
Expand Down Expand Up @@ -64,18 +60,7 @@ def _context(self) -> BackupContext:
"""
Create and configure BackupContext
"""

ctx = BackupContext(self._config)
ctx.ch_ctl_conf = self._config["clickhouse"]
ctx.main_conf = self._config["main"]

ctx.ch_ctl = ClickhouseCTL(ctx.ch_ctl_conf, ctx.main_conf, ctx.config)
ctx.backup_layout = BackupLayout(self._config)

ctx.config = self._config["backup"]
ctx.zk_config = self._config.get("zookeeper")
ctx.restore_context = RestoreContext(ctx.config)
ctx.ch_config = ClickhouseConfig(self._config)
return ctx

def reload_config(self, config: Config) -> None:
Expand Down Expand Up @@ -192,6 +177,9 @@ def backup(
databases,
db_tables,
schema_only=sources.schema_only,
freeze_threads=self._config["multiprocessing"][
"freeze_threads"
],
)

self._context.backup_meta.state = BackupState.CREATED
Expand Down
51 changes: 45 additions & 6 deletions ch_backup/clickhouse/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ch_backup.exceptions import ClickhouseBackupError
from ch_backup.util import (
chown_dir_contents,
chown_file,
escape,
list_dir_files,
retry,
Expand Down Expand Up @@ -473,15 +474,32 @@ def system_unfreeze(self, backup_name: str) -> None:
query_sql = SYSTEM_UNFREEZE_SQL.format(backup_name=backup_name)
self._ch_client.query(query_sql, timeout=self._unfreeze_timeout)

def remove_freezed_data(self) -> None:
def remove_freezed_data(
self, backup_name: Optional[str] = None, table: Optional[Table] = None
) -> None:
"""
Remove all freezed partitions from all local disks.
"""
for disk in self._disks.values():
if disk.type == "local":
shadow_path = os.path.join(disk.path, "shadow")
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)
if not (backup_name is None) == (table is None):
raise RuntimeError(
"Both backup_name and table should be None or not None at the same time"
)

if backup_name and table:
for table_data_path, disk in table.paths_with_disks:
if disk.type == "local":
table_relative_path = os.path.relpath(table_data_path, disk.path)
shadow_path = os.path.join(
disk.path, "shadow", backup_name, table_relative_path
)
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)
else:
for disk in self._disks.values():
if disk.type == "local":
shadow_path = os.path.join(disk.path, "shadow")
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)

def remove_freezed_part(self, part: FrozenPart) -> None:
"""
Expand Down Expand Up @@ -775,6 +793,27 @@ def chown_dir(self, dir_path: str) -> None:
need_recursion,
)

def create_shadow_increment(self) -> None:
"""
Create shadow/increment.txt to fix race condition with parallel freeze.
Must be used before freezing more than one table at once.
https://github.com/ClickHouse/ClickHouse/blob/597a72fd9afd88984abc10b284624c6b4d08368b/src/Common/Increment.h#L20
"""
default_shadow_path = Path(self._root_data_path) / "shadow"
increment_path = default_shadow_path / "increment.txt"
if os.path.exists(increment_path):
return
if not os.path.exists(default_shadow_path):
os.mkdir(default_shadow_path)
self.chown_dir(str(default_shadow_path))
with open(increment_path, "w", encoding="utf-8") as file:
file.write("0")
chown_file(
self._ch_ctl_config["user"],
self._ch_ctl_config["group"],
str(increment_path),
)

@retry(OSError)
def _remove_shadow_data(self, path: str) -> None:
if path.find("/shadow") == -1:
Expand Down
2 changes: 2 additions & 0 deletions ch_backup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ def _as_seconds(t: str) -> int:
"workers": 4,
# The number of processes for parts restoring from S3 disks.
"cloud_storage_restore_workers": 4,
# The number of threads for parallel freeze of tables
"freeze_threads": 4,
},
"pipeline": {
# Is asynchronous pipelines used (based on Pypeln library)
Expand Down
2 changes: 1 addition & 1 deletion ch_backup/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def exception(msg, *args, **kwargs):
Log a message with severity 'ERROR' with exception information.
"""

with_exception = kwargs.get("exc_info", False)
with_exception = kwargs.get("exc_info", True)
getLogger("ch-backup").opt(exception=with_exception).debug(msg, *args, **kwargs)


Expand Down
Loading

0 comments on commit 004d839

Please sign in to comment.