Skip to content

Commit

Permalink
[ORION-3581] Do not redownload parts that was downloaded, but not att…
Browse files Browse the repository at this point in the history
…ached

Change-Id: I13aba57ca58b75192481897673b6a705d7cfe0c8
  • Loading branch information
Pervakov Grigorii committed Sep 8, 2023
1 parent cfc8dea commit 497bfbd
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 15 deletions.
41 changes: 28 additions & 13 deletions ch_backup/backup/restore_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,33 @@

import json
from collections import defaultdict
from enum import Enum
from os.path import exists
from typing import Any, Dict, List, Mapping
from typing import Any, Dict, Mapping

from ch_backup.backup.metadata import PartMetadata


class PartState(str, Enum):
"""
Represents status of the data part, during restore.
"""

INVALID = "invalid"
DOWNLOADED = "downloaded"
RESTORED = "restored"


class RestoreContext:
"""
Backup restore context. Allows continue restore process after errors.
"""

def __init__(self, config: Dict):
self._state_file = config["restore_context_path"]
self._databases: Dict[str, Dict[str, List]] = {}
self._databases: Dict[str, Dict[str, Dict[str, PartState]]] = defaultdict(
lambda: defaultdict(lambda: defaultdict(lambda: PartState.INVALID))
)
self._failed: Mapping[str, Any] = defaultdict(
lambda: defaultdict(
lambda: {
Expand All @@ -29,27 +42,26 @@ def __init__(self, config: Dict):
if exists(self._state_file):
self._load_state()

def add_table(self, database: str, table: str) -> None:
def add_part(self, part: PartMetadata, state: PartState) -> None:
"""
Add table to restore metadata.
Marks that data part was restored.
"""
if database not in self._databases:
self._databases[database] = {}
self._databases[part.database][part.table][part.name] = state

if table not in self._databases[database]:
self._databases[database][table] = []
def _part(self, part: PartMetadata) -> PartState:
return self._databases[part.database][part.table][part.name]

def add_part(self, part: PartMetadata) -> None:
def part_downloaded(self, part: PartMetadata) -> bool:
"""
Marks that data part was restored.
Checks if data part was downloaded.
"""
self._databases[part.database][part.table].append(part.name)
return self._part(part) == PartState.DOWNLOADED

def part_restored(self, part: PartMetadata) -> bool:
"""
Checks if data part was restored.
"""
return part.name in self._databases[part.database][part.table]
return self._part(part) == PartState.RESTORED

def add_failed_chown(self, database: str, table: str, path: str) -> None:
"""
Expand Down Expand Up @@ -85,4 +97,7 @@ def dump_state(self) -> None:
def _load_state(self) -> None:
with open(self._state_file, "r", encoding="utf-8") as f:
state: Dict[str, Any] = json.load(f)
self._databases = state["databases"]
for db, tables in state.get("databases", {}).items():
for table, parts in tables.items():
for part_name, part_state in parts.items():
self._databases[db][table][part_name] = part_state
16 changes: 14 additions & 2 deletions ch_backup/logic/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
deduplicate_part,
)
from ch_backup.backup.metadata import PartMetadata, TableMetadata
from ch_backup.backup.restore_context import PartState
from ch_backup.backup_context import BackupContext
from ch_backup.clickhouse.client import ClickhouseError
from ch_backup.clickhouse.disks import ClickHouseTemporaryDisks
Expand Down Expand Up @@ -561,6 +562,7 @@ def _restore_data(
skip_cloud_storage: bool,
keep_going: bool,
) -> None:
# pylint: disable=too-many-branches
logging.info("Restoring tables data")
for table_meta in tables:
try:
Expand All @@ -570,7 +572,6 @@ def _restore_data(
table_meta.name,
)

context.restore_context.add_table(table_meta.database, table_meta.name)
maybe_table = context.ch_ctl.get_table(
table_meta.database, table_meta.name
)
Expand All @@ -587,6 +588,13 @@ def _restore_data(
)
continue

if context.restore_context.part_downloaded(part):
logging.debug(
f"{table.database}.{table.name} part {part.name} already downloading, only attach it"
)
attach_parts.append(part)
continue

try:
if part.disk_name in context.backup_meta.cloud_storage.disks:
if skip_cloud_storage:
Expand Down Expand Up @@ -615,6 +623,8 @@ def _restore_data(
raise

context.backup_layout.wait(keep_going)
for part in attach_parts:
context.restore_context.add_part(part, PartState.DOWNLOADED)

context.ch_ctl.chown_detached_table_parts(
table, context.restore_context
Expand All @@ -628,7 +638,7 @@ def _restore_data(
)
try:
context.ch_ctl.attach_part(table, part.name)
context.restore_context.add_part(part)
context.restore_context.add_part(part, PartState.RESTORED)
except Exception as e:
logging.warning(
'Attaching "%s.%s" part %s failed: %s',
Expand All @@ -638,6 +648,8 @@ def _restore_data(
repr(e),
)
context.restore_context.add_failed_part(part, e)
# if part failed to attach due to corrupted data during download
context.restore_context.add_part(part, PartState.INVALID)
finally:
context.restore_context.dump_state()

Expand Down

0 comments on commit 497bfbd

Please sign in to comment.