diff --git a/ch_backup/backup/layout.py b/ch_backup/backup/layout.py index bfc02b32..5cdd2c3c 100644 --- a/ch_backup/backup/layout.py +++ b/ch_backup/backup/layout.py @@ -4,7 +4,7 @@ import os from pathlib import Path -from typing import List, Optional, Sequence +from typing import Callable, List, Optional, Sequence from urllib.parse import quote from ch_backup import logging @@ -151,7 +151,9 @@ def upload_udf(self, backup_name: str, file_name: str, metadata: str) -> None: msg = f'Failed to upload udf metadata "{remote_path}"' raise StorageError(msg) from e - def upload_data_part(self, backup_name: str, fpart: FrozenPart) -> None: + def upload_data_part( + self, backup_name: str, fpart: FrozenPart, callback: Callable + ) -> None: """ Upload part data. """ @@ -174,6 +176,7 @@ def upload_data_part(self, backup_name: str, fpart: FrozenPart) -> None: is_async=True, encryption=True, delete=True, + callback=callback, ) except Exception as e: msg = f"Failed to create async upload of {remote_path}" diff --git a/ch_backup/config.py b/ch_backup/config.py index 7d3514fe..bc071203 100644 --- a/ch_backup/config.py +++ b/ch_backup/config.py @@ -64,6 +64,7 @@ def _as_seconds(t: str) -> int: "restore_context_path": "/tmp/ch_backup_restore_state.json", # nosec "validate_part_after_upload": False, "restore_fail_on_attach_error": False, + "update_metadata_interval": _as_seconds("30 min"), }, "storage": { "type": "s3", diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 6a2a034c..91603a5b 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -4,6 +4,7 @@ import os from collections import deque from dataclasses import dataclass +from functools import partial from itertools import chain from pathlib import Path from typing import Dict, Iterable, List, Optional, Sequence, Tuple @@ -33,6 +34,7 @@ ) from ch_backup.exceptions import ClickhouseBackupError from ch_backup.logic.backup_manager import BackupManager +from ch_backup.logic.upload_part_observer import UploadPartObserver from ch_backup.util import compare_schema, get_table_zookeeper_paths @@ -123,7 +125,7 @@ def _backup( if table.name not in mtimes: continue - table_meta = self._backup_table( + self._backup_table( context, db, table, @@ -132,8 +134,6 @@ def _backup( dedup_info.table(table.name), mtimes, ) - if table_meta is not None: - context.backup_meta.add_table(table_meta) context.backup_layout.upload_backup_metadata(context.backup_meta) @@ -304,7 +304,7 @@ def _backup_table( schema_only: bool, dedup_info: TableDedupInfo, mtimes: Dict[str, TableMetadataMtime], - ) -> Optional[TableMetadata]: + ) -> None: """ Make backup of metadata and data of single table. @@ -313,8 +313,6 @@ def _backup_table( logging.debug( 'Performing table backup for "{}"."{}"', table.database, table.name ) - table_meta = TableMetadata(table.database, table.name, table.engine, table.uuid) - create_statement = self._load_create_statement_from_disk(table) if not create_statement: logging.warning( @@ -322,7 +320,7 @@ def _backup_table( db.name, table.name, ) - return None + return # Freeze only MergeTree tables if not schema_only and is_merge_tree(table.engine): @@ -337,7 +335,7 @@ def _backup_table( table.database, table.name, ) - return None + return # Check if table metadata was updated new_mtime = self._get_mtime(table.metadata_path) @@ -348,25 +346,24 @@ def _backup_table( table.name, ) context.ch_ctl.remove_freezed_data() - return None + return + # Add table metadata to backup metadata + context.backup_meta.add_table( + TableMetadata(table.database, table.name, table.engine, table.uuid) + ) # Backup table metadata context.backup_layout.upload_table_create_statement( context.backup_meta.name, db, table, create_statement ) # Backup table data if not schema_only: - self._backup_frozen_table_data( - context, table, table_meta, backup_name, dedup_info - ) - - return table_meta + self._backup_frozen_table_data(context, table, backup_name, dedup_info) def _backup_frozen_table_data( self, context: BackupContext, table: Table, - table_meta: TableMetadata, backup_name: str, dedup_info: TableDedupInfo, ) -> None: @@ -383,7 +380,8 @@ def _backup_frozen_table_data( logging.debug('Uploading table data for "{}"."{}"', table.database, table.name) - uploaded_parts = [] + upload_observer = UploadPartObserver(context) + for data_path, disk in table.paths_with_disks: freezed_parts = context.ch_ctl.list_frozen_parts( table, disk, data_path, backup_name @@ -391,27 +389,29 @@ def _backup_frozen_table_data( for fpart in freezed_parts: logging.debug("Working on {}", fpart) + part = PartMetadata.from_frozen_part(fpart) if disk.type == "s3": - table_meta.add_part(PartMetadata.from_frozen_part(fpart)) + context.backup_meta.add_part(part) continue # trying to find part in storage - part = deduplicate_part(context.backup_layout, fpart, dedup_info) - if part: + deduplicated_part = deduplicate_part( + context.backup_layout, fpart, dedup_info + ) + if deduplicated_part: context.ch_ctl.remove_freezed_part(fpart) + context.backup_meta.add_part(deduplicated_part) else: context.backup_layout.upload_data_part( - context.backup_meta.name, fpart + context.backup_meta.name, + fpart, + partial(upload_observer, part), ) - part = PartMetadata.from_frozen_part(fpart) - uploaded_parts.append(part) - - table_meta.add_part(part) # type: ignore context.backup_layout.wait() - self._validate_uploaded_parts(context, uploaded_parts) + self._validate_uploaded_parts(context, upload_observer.uploaded_parts) context.ch_ctl.remove_freezed_data() diff --git a/ch_backup/logic/upload_part_observer.py b/ch_backup/logic/upload_part_observer.py new file mode 100644 index 00000000..fb78b305 --- /dev/null +++ b/ch_backup/logic/upload_part_observer.py @@ -0,0 +1,44 @@ +"""Uploading part observer.""" +import time +from typing import List, Optional + +from ch_backup.backup.metadata.part_metadata import PartMetadata +from ch_backup.backup_context import BackupContext + + +class UploadPartObserver: + """ + Observe uploading parts. + + Update backup metadata with specified interval after completion of + uploading every part to object storage. + """ + + def __init__(self, context: BackupContext) -> None: + self._context = context + self._last_time = time.time() + self._uploaded_parts: List[PartMetadata] = [] + self._interval = self._context.config["update_metadata_interval"] + + def __call__( + self, part: PartMetadata, exception: Optional[Exception] = None + ) -> None: + if exception: + return + + self._uploaded_parts.append(part) + self._context.backup_meta.add_part(part) + + now = time.time() + if now - self._last_time >= self._interval: + self._context.backup_layout.upload_backup_metadata( + self._context.backup_meta + ) + self._last_time = now + + @property + def uploaded_parts(self) -> List[PartMetadata]: + """ + Return uploaded parts metadata. + """ + return self._uploaded_parts diff --git a/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py b/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py index cb6c071f..0b7e9269 100644 --- a/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py +++ b/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py @@ -1,12 +1,25 @@ """ Class for executing callables on specified pool. """ -from concurrent.futures import ALL_COMPLETED, Executor, Future, wait -from typing import Any, Callable, Dict +from concurrent.futures import Executor, Future, as_completed +from dataclasses import dataclass +from typing import Any, Callable, Dict, Optional from ch_backup import logging +@dataclass +class Job: + """ + Job submitted to ExecPool. + + Callback is executed after job completion. + """ + + id_: str + callback: Optional[Callable] + + class ExecPool: """ Submit tasks on provided executor. @@ -15,7 +28,7 @@ class ExecPool: """ def __init__(self, executor: Executor) -> None: - self._futures: Dict[str, Future] = {} + self._future_to_job: Dict[Future, Job] = {} self._pool = executor def shutdown(self, graceful: bool = True) -> None: @@ -24,15 +37,22 @@ def shutdown(self, graceful: bool = True) -> None: """ self._pool.shutdown(wait=graceful) - def submit(self, future_id: str, func: Callable, *args: Any, **kwargs: Any) -> None: + def submit( + self, + job_id: str, + func: Callable, + callback: Optional[Callable], + *args: Any, + **kwargs: Any + ) -> None: """ Schedule job for execution """ - if future_id in self._futures: + if job_id in [job.id_ for job in self._future_to_job.values()]: raise RuntimeError("Duplicate") + future = self._pool.submit(func, *args, **kwargs) - future.add_done_callback(lambda _: logging.debug("Future {} completed", future_id)) # type: ignore[misc] - self._futures[future_id] = future + self._future_to_job[future] = Job(job_id, callback) def wait_all(self, keep_going: bool = False) -> None: """ @@ -41,24 +61,31 @@ def wait_all(self, keep_going: bool = False) -> None: Args: keep_going - skip exceptions raised by futures instead of propagating it. """ - wait(self._futures.values(), return_when=ALL_COMPLETED) + for future in as_completed(self._future_to_job): + job = self._future_to_job[future] + logging.debug("Future {} completed", job.id_) - for future_id, future in self._futures.items(): try: future.result() - except Exception: + except Exception as ex: + if job.callback: + job.callback(ex) if keep_going: logging.warning( - 'Future "{}" generated an exception, skipping due to keep_going flag', - future_id, + 'Job "{}" generated an exception, skipping due to keep_going flag', + job.id_, exc_info=True, ) continue logging.error( - 'Future "{}" generated an exception:', future_id, exc_info=True + 'Job "{}" generated an exception:', job.id_, exc_info=True ) raise - self._futures = {} + + if job.callback: + job.callback() + + self._future_to_job = {} def __del__(self) -> None: """ diff --git a/ch_backup/storage/async_pipeline/pipeline_executor.py b/ch_backup/storage/async_pipeline/pipeline_executor.py index fb9a3a57..7d835988 100755 --- a/ch_backup/storage/async_pipeline/pipeline_executor.py +++ b/ch_backup/storage/async_pipeline/pipeline_executor.py @@ -85,6 +85,7 @@ def upload_files_tarball( is_async: bool, encryption: bool, delete: bool, + callback: Optional[Callable] = None, ) -> None: """ Archive to tarball and upload files from local filesystem. @@ -101,7 +102,7 @@ def upload_files_tarball( encryption, delete_after=delete, ) - self._exec_pipeline(job_id, pipeline, is_async) + self._exec_pipeline(job_id, pipeline, is_async, callback) def download_data( self, remote_path: str, is_async: bool, encryption: bool @@ -169,15 +170,30 @@ def wait(self, keep_going: bool = False) -> None: if self._exec_pool: self._exec_pool.wait_all(keep_going) - def _exec_pipeline(self, job_id: str, pipeline: Callable, is_async: bool) -> Any: + def _exec_pipeline( + self, + job_id: str, + pipeline: Callable, + is_async: bool, + callback: Optional[Callable] = None, + ) -> Any: """ Run pipeline inplace or schedule for exec in process pool """ if is_async and self._exec_pool: - return self._exec_pool.submit(job_id, pipeline) - - return pipeline() + return self._exec_pool.submit(job_id, pipeline, callback) + + try: + result = pipeline() + except Exception as ex: + if callback: + callback(ex) + raise + + if callback: + callback() + return result @staticmethod def _make_job_id(job_name: str, *args: Any) -> str: diff --git a/ch_backup/storage/loader.py b/ch_backup/storage/loader.py index 543b7c4b..035412d7 100644 --- a/ch_backup/storage/loader.py +++ b/ch_backup/storage/loader.py @@ -2,7 +2,7 @@ Module providing API for storage management (upload and download data, check remote path on existence, etc.). """ -from typing import List, Sequence +from typing import Callable, List, Optional, Sequence from ch_backup.storage.async_pipeline.pipeline_executor import PipelineExecutor from ch_backup.storage.engine import get_storage_engine @@ -76,6 +76,7 @@ def upload_files_tarball( is_async: bool = False, encryption: bool = False, delete: bool = False, + callback: Optional[Callable] = None, ) -> str: """ Upload multiple files as tarball. @@ -89,6 +90,7 @@ def upload_files_tarball( is_async=is_async, encryption=encryption, delete=delete, + callback=callback, ) return remote_path diff --git a/tests/unit/test_upload_part_observer.py b/tests/unit/test_upload_part_observer.py new file mode 100644 index 00000000..f5f928a1 --- /dev/null +++ b/tests/unit/test_upload_part_observer.py @@ -0,0 +1,117 @@ +import copy +from typing import List +from unittest.mock import Mock, patch + +from tests.unit.utils import parametrize + +from ch_backup.backup.metadata.backup_metadata import BackupMetadata +from ch_backup.backup.metadata.part_metadata import PartMetadata +from ch_backup.backup.metadata.table_metadata import TableMetadata +from ch_backup.backup_context import BackupContext +from ch_backup.clickhouse.models import Database +from ch_backup.logic.upload_part_observer import UploadPartObserver + +UUID = "fa8ff291-1922-4b7f-afa7-06633d5e16ae" +DB_NAME = "test_db" +TABLE_NAME = "test_table" +ENGINE = "MergeTree" +BACKUP_NAME = "TestBackup" +BACKUP_META = BackupMetadata( + name=BACKUP_NAME, + path=f"ch_backup/{BACKUP_NAME}", + version="1.0.100", + ch_version="19.1.16", + time_format="%Y-%m-%dT%H:%M:%S%Z", + hostname="clickhouse01.test_net_711", +) +DB = Database(DB_NAME, ENGINE, f"/var/lib/clickhouse/metadata/{DB_NAME}.sql") + + +@parametrize( + { + "id": "One part before interval", + "args": { + "times": [0, 1], + "part_names": ["1"], + "interval": 2, + "expected_upload_metadata": 0, + }, + }, + { + "id": "One part after interval", + "args": { + "times": [0, 2], + "part_names": ["1"], + "interval": 1, + "expected_upload_metadata": 1, + }, + }, + { + "id": "One before. One after", + "args": { + "times": [0, 1, 10], + "part_names": ["1", "2"], + "interval": 5, + "expected_upload_metadata": 1, + }, + }, + { + "id": "Two parts after interval", + "args": { + "times": [0, 1, 10], + "part_names": ["1", "2"], + "interval": 1, + "expected_upload_metadata": 2, + }, + }, + { + "id": "Mix", + "args": { + "times": [0, 1, 2, 10, 20], + "part_names": ["1", "2", "3", "4"], + "interval": 5, + "expected_upload_metadata": 2, + }, + }, +) +def test_observer( + times: List[int], + part_names: List[str], + interval: int, + expected_upload_metadata: int, +) -> None: + config = {"backup": {"update_metadata_interval": interval}} + + backup_meta = copy.deepcopy(BACKUP_META) + backup_meta.add_database(DB) + + context = BackupContext(config) # type: ignore[arg-type] + context.backup_meta = backup_meta + + # Add table metadata to backup metadata + context.backup_meta.add_table(TableMetadata(DB_NAME, TABLE_NAME, ENGINE, UUID)) + + context.backup_layout = Mock() + + with patch("time.time", side_effect=times): + observer = UploadPartObserver(context) + + for name in part_names: + part = PartMetadata( + DB_NAME, + TABLE_NAME, + name, + "AABBCCDD", + 1000, + ["column1.idx"], + True, + None, + None, + ) + observer(part) + + assert ( + context.backup_layout.upload_backup_metadata.call_count + == expected_upload_metadata + ) + assert len(context.backup_meta.get_parts()) == len(part_names)