Skip to content

Commit

Permalink
Do not redownload parts that was downloaded, but not attached
Browse files Browse the repository at this point in the history
Change-Id: I13aba57ca58b75192481897673b6a705d7cfe0c8
  • Loading branch information
Pervakov Grigorii committed Sep 6, 2023
1 parent 2656373 commit dd0ec51
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 14 deletions.
36 changes: 24 additions & 12 deletions ch_backup/backup/restore_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,33 @@

import json
from collections import defaultdict
from enum import Enum
from os.path import exists
from typing import Any, Dict, List, Mapping
from typing import Any, Dict, Mapping

from ch_backup.backup.metadata import PartMetadata


class PartState(str, Enum):
"""
Represents status of the data part, during restore.
"""

INVALID = "invalid"
DOWNLOADED = "downloaded"
RESTORED = "restored"


class RestoreContext:
"""
Backup restore context. Allows continue restore process after errors.
"""

def __init__(self, config: Dict):
self._state_file = config["restore_context_path"]
self._databases: Dict[str, Dict[str, List]] = {}
self._databases: Dict[str, Dict[str, Dict[str, PartState]]] = defaultdict(
lambda: defaultdict(lambda: defaultdict(lambda: PartState.INVALID))
)
self._failed: Mapping[str, Any] = defaultdict(
lambda: defaultdict(
lambda: {
Expand All @@ -29,27 +42,26 @@ def __init__(self, config: Dict):
if exists(self._state_file):
self._load_state()

def add_table(self, database: str, table: str) -> None:
def add_part(self, part: PartMetadata, state: PartState) -> None:
"""
Add table to restore metadata.
Marks that data part was restored.
"""
if database not in self._databases:
self._databases[database] = {}
self._databases[part.database][part.table][part.name] = state

if table not in self._databases[database]:
self._databases[database][table] = []
def _part(self, part: PartMetadata) -> PartState:
return self._databases[part.database][part.table][part.name]

def add_part(self, part: PartMetadata) -> None:
def part_downloaded(self, part: PartMetadata) -> bool:
"""
Marks that data part was restored.
Checks if data part was downloaded.
"""
self._databases[part.database][part.table].append(part.name)
return self._part(part) == PartState.DOWNLOADED

def part_restored(self, part: PartMetadata) -> bool:
"""
Checks if data part was restored.
"""
return part.name in self._databases[part.database][part.table]
return self._part(part) == PartState.RESTORED

def add_failed_chown(self, database: str, table: str, path: str) -> None:
"""
Expand Down
12 changes: 10 additions & 2 deletions ch_backup/logic/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
deduplicate_part,
)
from ch_backup.backup.metadata import PartMetadata, TableMetadata
from ch_backup.backup.restore_context import PartState
from ch_backup.backup_context import BackupContext
from ch_backup.clickhouse.client import ClickhouseError
from ch_backup.clickhouse.disks import ClickHouseTemporaryDisks
Expand Down Expand Up @@ -570,7 +571,6 @@ def _restore_data(
table_meta.name,
)

context.restore_context.add_table(table_meta.database, table_meta.name)
maybe_table = context.ch_ctl.get_table(
table_meta.database, table_meta.name
)
Expand All @@ -587,6 +587,13 @@ def _restore_data(
)
continue

if context.restore_context.part_downloaded(part):
logging.debug(
f"{table.database}.{table.name} part {part.name} already downloading, only attach it"
)
attach_parts.append(part)
continue

try:
if part.disk_name in context.backup_meta.cloud_storage.disks:
if skip_cloud_storage:
Expand All @@ -605,6 +612,7 @@ def _restore_data(
context.backup_meta, part, fs_part_path
)

context.restore_context.add_part(part, PartState.DOWNLOADED)
attach_parts.append(part)
except Exception:
if keep_going:
Expand All @@ -628,7 +636,7 @@ def _restore_data(
)
try:
context.ch_ctl.attach_part(table, part.name)
context.restore_context.add_part(part)
context.restore_context.add_part(part, PartState.RESTORED)
except Exception as e:
logging.warning(
'Attaching "%s.%s" part %s failed: %s',
Expand Down

0 comments on commit dd0ec51

Please sign in to comment.