diff --git a/ch_backup/backup/restore_context.py b/ch_backup/backup/restore_context.py index f0d53521..8bb54bd0 100644 --- a/ch_backup/backup/restore_context.py +++ b/ch_backup/backup/restore_context.py @@ -4,12 +4,23 @@ import json from collections import defaultdict +from enum import Enum from os.path import exists -from typing import Any, Dict, List, Mapping +from typing import Any, Dict, Mapping from ch_backup.backup.metadata import PartMetadata +class PartState(str, Enum): + """ + Represents status of the data part, during restore. + """ + + INVALID = "invalid" + DOWNLOADED = "downloaded" + RESTORED = "restored" + + class RestoreContext: """ Backup restore context. Allows continue restore process after errors. @@ -17,7 +28,9 @@ class RestoreContext: def __init__(self, config: Dict): self._state_file = config["restore_context_path"] - self._databases: Dict[str, Dict[str, List]] = {} + self._databases: Dict[str, Dict[str, Dict[str, PartState]]] = defaultdict( + lambda: defaultdict(lambda: defaultdict(lambda: PartState.INVALID)) + ) self._failed: Mapping[str, Any] = defaultdict( lambda: defaultdict( lambda: { @@ -29,27 +42,26 @@ def __init__(self, config: Dict): if exists(self._state_file): self._load_state() - def add_table(self, database: str, table: str) -> None: + def add_part(self, part: PartMetadata, state: PartState) -> None: """ - Add table to restore metadata. + Marks that data part was restored. """ - if database not in self._databases: - self._databases[database] = {} + self._databases[part.database][part.table][part.name] = state - if table not in self._databases[database]: - self._databases[database][table] = [] + def _part(self, part: PartMetadata) -> PartState: + return self._databases[part.database][part.table][part.name] - def add_part(self, part: PartMetadata) -> None: + def part_downloaded(self, part: PartMetadata) -> bool: """ - Marks that data part was restored. + Checks if data part was downloaded. """ - self._databases[part.database][part.table].append(part.name) + return self._part(part) == PartState.DOWNLOADED def part_restored(self, part: PartMetadata) -> bool: """ Checks if data part was restored. """ - return part.name in self._databases[part.database][part.table] + return self._part(part) == PartState.RESTORED def add_failed_chown(self, database: str, table: str, path: str) -> None: """ @@ -85,4 +97,7 @@ def dump_state(self) -> None: def _load_state(self) -> None: with open(self._state_file, "r", encoding="utf-8") as f: state: Dict[str, Any] = json.load(f) - self._databases = state["databases"] + for db, tables in state.get("databases", {}).items(): + for table, parts in tables.items(): + for part_name, part_state in parts.items(): + self._databases[db][table][part_name] = part_state diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index f4793142..43b6f7d2 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -16,6 +16,7 @@ deduplicate_part, ) from ch_backup.backup.metadata import PartMetadata, TableMetadata +from ch_backup.backup.restore_context import PartState from ch_backup.backup_context import BackupContext from ch_backup.clickhouse.client import ClickhouseError from ch_backup.clickhouse.disks import ClickHouseTemporaryDisks @@ -561,6 +562,7 @@ def _restore_data( skip_cloud_storage: bool, keep_going: bool, ) -> None: + # pylint: disable=too-many-branches logging.info("Restoring tables data") for table_meta in tables: try: @@ -570,7 +572,6 @@ def _restore_data( table_meta.name, ) - context.restore_context.add_table(table_meta.database, table_meta.name) maybe_table = context.ch_ctl.get_table( table_meta.database, table_meta.name ) @@ -587,6 +588,13 @@ def _restore_data( ) continue + if context.restore_context.part_downloaded(part): + logging.debug( + f"{table.database}.{table.name} part {part.name} already downloading, only attach it" + ) + attach_parts.append(part) + continue + try: if part.disk_name in context.backup_meta.cloud_storage.disks: if skip_cloud_storage: @@ -615,6 +623,8 @@ def _restore_data( raise context.backup_layout.wait(keep_going) + for part in attach_parts: + context.restore_context.add_part(part, PartState.DOWNLOADED) context.ch_ctl.chown_detached_table_parts( table, context.restore_context @@ -628,7 +638,7 @@ def _restore_data( ) try: context.ch_ctl.attach_part(table, part.name) - context.restore_context.add_part(part) + context.restore_context.add_part(part, PartState.RESTORED) except Exception as e: logging.warning( 'Attaching "%s.%s" part %s failed: %s', @@ -638,6 +648,8 @@ def _restore_data( repr(e), ) context.restore_context.add_failed_part(part, e) + # if part failed to attach due to corrupted data during download + context.restore_context.add_part(part, PartState.INVALID) finally: context.restore_context.dump_state()