From 0214f1ef7e18c76437d45d7e023b607b5478312c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Tue, 2 Jul 2024 02:18:06 +0300 Subject: [PATCH] Parallel backup --- ch_backup/backup/metadata/backup_metadata.py | 27 ++-- ch_backup/ch_backup.py | 2 +- ch_backup/clickhouse/control.py | 25 ++-- ch_backup/config.py | 4 +- ch_backup/logic/table.py | 136 ++++++++++-------- tests/integration/ch_backup.featureset | 2 +- ...rallel.feature => backup_parallel.feature} | 8 +- tests/unit/test_backup_tables.py | 2 +- 8 files changed, 112 insertions(+), 94 deletions(-) rename tests/integration/features/{freeze_parallel.feature => backup_parallel.feature} (87%) diff --git a/ch_backup/backup/metadata/backup_metadata.py b/ch_backup/backup/metadata/backup_metadata.py index 546ec6da..a686345a 100644 --- a/ch_backup/backup/metadata/backup_metadata.py +++ b/ch_backup/backup/metadata/backup_metadata.py @@ -6,6 +6,7 @@ import socket from datetime import datetime, timezone from enum import Enum +from threading import Lock from typing import Any, Dict, List, Optional, Sequence from ch_backup.backup.metadata.access_control_metadata import AccessControlMetadata @@ -70,6 +71,8 @@ def __init__( self._user_defined_functions: List[str] = [] self._named_collections: List[str] = [] + self._lock = Lock() + def __str__(self) -> str: return self.dump_json() @@ -264,16 +267,17 @@ def add_table(self, table: TableMetadata) -> None: """ Add table to backup metadata. """ - tables = self._databases[table.database]["tables"] + with self._lock: + tables = self._databases[table.database]["tables"] - assert table.name not in tables + assert table.name not in tables - tables[table.name] = table.raw_metadata + tables[table.name] = table.raw_metadata - for part in table.get_parts(): - self.size += part.size - if not part.link: - self.real_size += part.size + for part in table.get_parts(): + self.size += part.size + if not part.link: + self.real_size += part.size def add_udf(self, udf_name: str) -> None: """ @@ -328,11 +332,12 @@ def add_part(self, part: PartMetadata) -> None: """ Add data part to backup metadata. """ - self.get_table(part.database, part.table).add_part(part) + with self._lock: + self.get_table(part.database, part.table).add_part(part) - self.size += part.size - if not part.link: - self.real_size += part.size + self.size += part.size + if not part.link: + self.real_size += part.size def remove_parts(self, table: TableMetadata, parts: List[PartMetadata]) -> None: """ diff --git a/ch_backup/ch_backup.py b/ch_backup/ch_backup.py index f2f514fd..ff745b85 100644 --- a/ch_backup/ch_backup.py +++ b/ch_backup/ch_backup.py @@ -49,7 +49,7 @@ def __init__(self, config: Config) -> None: self._access_backup_manager = AccessBackup() self._database_backup_manager = DatabaseBackup() self._table_backup_manager = TableBackup( - self._config.get("multiprocessing").get("freeze_workers", 1) + self._config.get("multiprocessing").get("backup_threads", 1) ) self._udf_backup_manager = UDFBackup() self._nc_backup_manager = NamedCollectionsBackup() diff --git a/ch_backup/clickhouse/control.py b/ch_backup/clickhouse/control.py index 10714154..34a8b422 100644 --- a/ch_backup/clickhouse/control.py +++ b/ch_backup/clickhouse/control.py @@ -210,9 +210,12 @@ CREATE TABLE IF NOT EXISTS `{system_db}`._deduplication_info_current ( name String, checksum String, + database String, + table String, ) ENGINE = MergeTree() ORDER BY (name, checksum) + PARTITION BY (database, table) """ ) @@ -954,12 +957,13 @@ def create_deduplication_table(self): system_db=escape(self._backup_config["system_database"]) ) ) - self._ch_client.query( - TRUNCATE_TABLE_IF_EXISTS_SQL.format( - db_name=escape(self._backup_config["system_database"]), - table_name="_deduplication_info", + for table_name in ["_deduplication_info", "_deduplication_info_current"]: + self._ch_client.query( + TRUNCATE_TABLE_IF_EXISTS_SQL.format( + db_name=escape(self._backup_config["system_database"]), + table_name=table_name, + ) ) - ) self._ch_client.query( CREATE_IF_NOT_EXISTS_DEDUP_TABLE_SQL.format( system_db=escape(self._backup_config["system_database"]) @@ -985,19 +989,16 @@ def get_deduplication_info( """ Get deduplication info for given frozen parts of a table """ - self._ch_client.query( - TRUNCATE_TABLE_IF_EXISTS_SQL.format( - db_name=escape(self._backup_config["system_database"]), - table_name="_deduplication_info_current", - ) - ) self._ch_client.query( CREATE_IF_NOT_EXISTS_DEDUP_TABLE_CURRENT_SQL.format( system_db=escape(self._backup_config["system_database"]) ) ) - batch = [f"('{part.name}','{part.checksum}')" for part in frozen_parts.values()] + batch = [ + f"('{part.name}','{part.checksum}','{database}','{table}')" + for part in frozen_parts.values() + ] self._ch_client.query( INSERT_DEDUP_INFO_BATCH_SQL.format( system_db=escape(self._backup_config["system_database"]), diff --git a/ch_backup/config.py b/ch_backup/config.py index 40484f67..4e4c3eaa 100644 --- a/ch_backup/config.py +++ b/ch_backup/config.py @@ -156,8 +156,8 @@ def _as_seconds(t: str) -> int: "workers": 4, # The number of processes for parts restoring from S3 disks. "cloud_storage_restore_workers": 4, - # The number of processes for parallel freeze of tables - "freeze_workers": 4, + # The number of threads for parallel freeze and backup of tables + "backup_threads": 4, }, "pipeline": { # Is asynchronous pipelines used (based on Pypeln library) diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 0e6f07e2..4bd97578 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -4,12 +4,12 @@ import os from collections import deque -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import Future, ThreadPoolExecutor, as_completed from dataclasses import dataclass from functools import partial from itertools import chain from pathlib import Path -from typing import Callable, Dict, Iterable, List, Optional, Sequence, Tuple +from typing import Dict, Iterable, List, Optional, Sequence, Tuple from ch_backup import logging from ch_backup.backup.deduplication import deduplicate_parts @@ -50,8 +50,8 @@ class TableBackup(BackupManager): Table backup class """ - def __init__(self, freeze_workers: int = 1): - self._freeze_workers = freeze_workers + def __init__(self, backup_threads: int = 1): + self._backup_threads = backup_threads def backup( self, @@ -125,9 +125,10 @@ def _backup( context.ch_ctl.create_shadow_increment() with ThreadPoolExecutor( - max_workers=self._freeze_workers - ) as freeze_executor: - freeze_futures = [] + max_workers=self._backup_threads + ) as backup_executor: + backup_futures: List[Future] = [] + upload_observers: List[UploadPartObserver] = [] for table in context.ch_ctl.get_tables(db.name, tables): if table.name not in mtimes: @@ -139,9 +140,9 @@ def _backup( table.name, ) - freeze_futures.append( - freeze_executor.submit( - self._freeze_table, + backup_futures.append( + backup_executor.submit( + self._backup_table, context, db, table, @@ -151,15 +152,51 @@ def _backup( ) ) - for freeze_future in as_completed(freeze_futures): - backup_freezed_table = freeze_future.result() - if backup_freezed_table is not None: - backup_freezed_table() + for backup_future in as_completed(backup_futures): + upload_observer = backup_future.result() + if upload_observer: + upload_observers.append(upload_observer) + + context.backup_layout.wait() + + logging.debug( + 'All tables from "{}" are processed, validating uploaded parts.', + db.name, + ) + for upload_observer in upload_observers: + self._validate_uploaded_parts( + context, upload_observer.uploaded_parts + ) context.ch_ctl.remove_freezed_data() context.backup_layout.upload_backup_metadata(context.backup_meta) + def _backup_table( + self, + context: BackupContext, + db: Database, + table: Table, + backup_name: str, + schema_only: bool, + mtimes: Dict[str, TableMetadataMtime], + ) -> Optional[UploadPartObserver]: + create_statement = self._load_create_statement_from_disk(table) + if not create_statement: + logging.warning( + 'Skipping table backup for "{}"."{}". Local metadata is empty or absent', + db.name, + table.name, + ) + return None + # Freeze only MergeTree tables + if not schema_only and is_merge_tree(table.engine): + if not self._freeze_table(context, table, backup_name): + return None + return self._backup_table_after_freeze( + context, db, table, backup_name, schema_only, mtimes, create_statement + ) + @staticmethod def _backup_cloud_storage_metadata(context: BackupContext) -> None: """ @@ -329,50 +366,27 @@ def _load_create_statement_from_disk(table: Table) -> Optional[bytes]: def _freeze_table( self, context: BackupContext, - db: Database, table: Table, backup_name: str, - schema_only: bool, - mtimes: Dict[str, TableMetadataMtime], - ) -> Optional[Callable]: + ) -> bool: """ - Freeze table and return function which backups freezed table. + Freeze table. """ logging.debug('Trying to freeze "{}"."{}"', table.database, table.name) - create_statement = self._load_create_statement_from_disk(table) - if not create_statement: + + try: + context.ch_ctl.freeze_table(backup_name, table) + except ClickhouseError: + if context.ch_ctl.does_table_exist(table.database, table.name): + raise + logging.warning( - 'Skipping table backup for "{}"."{}". Local metadata is empty or absent', - db.name, + 'Table "{}"."{}" was removed by a user during backup', + table.database, table.name, ) - return None - - # Freeze only MergeTree tables - if not schema_only and is_merge_tree(table.engine): - try: - context.ch_ctl.freeze_table(backup_name, table) - except ClickhouseError: - if context.ch_ctl.does_table_exist(table.database, table.name): - raise - - logging.warning( - 'Table "{}"."{}" was removed by a user during backup', - table.database, - table.name, - ) - return None - - return partial( - self._backup_table_after_freeze, - context, - db, - table, - backup_name, - schema_only, - mtimes, - create_statement, - ) + return False + return True def _backup_table_after_freeze( self, @@ -383,7 +397,7 @@ def _backup_table_after_freeze( schema_only: bool, mtimes: Dict[str, TableMetadataMtime], create_statement: bytes, - ) -> None: + ) -> Optional[UploadPartObserver]: # Check if table metadata was updated new_mtime = self._get_mtime(table.metadata_path) if new_mtime is None or mtimes[table.name].mtime != new_mtime: @@ -392,8 +406,7 @@ def _backup_table_after_freeze( table.database, table.name, ) - context.ch_ctl.remove_freezed_data(backup_name, table) - return + return None logging.debug( 'Performing table backup for "{}"."{}"', table.database, table.name @@ -408,14 +421,15 @@ def _backup_table_after_freeze( ) # Backup table data if not schema_only: - self._backup_frozen_table_data(context, table, backup_name) + return self._backup_frozen_table_data(context, table, backup_name) + return None def _backup_frozen_table_data( self, context: BackupContext, table: Table, backup_name: str, - ) -> None: + ) -> Optional[UploadPartObserver]: """ Backup table with data opposed to schema only. """ @@ -426,7 +440,9 @@ def deduplicate_parts_in_batch( frozen_parts: Dict[str, FrozenPart], ) -> None: logging.debug( - "Working on deduplication of {} frozen parts", len(frozen_parts) + 'Working on deduplication of {} frozen parts of "{}"', + len(frozen_parts), + table.name, ) deduplicated_parts = deduplicate_parts( context, table.database, table.name, frozen_parts @@ -458,7 +474,7 @@ def deduplicate_parts_in_batch( table.database, table.name, ) - return + return None logging.debug('Uploading table data for "{}"."{}"', table.database, table.name) @@ -483,11 +499,7 @@ def deduplicate_parts_in_batch( if frozen_parts_batch: deduplicate_parts_in_batch(context, upload_observer, frozen_parts_batch) - context.backup_layout.wait() - - self._validate_uploaded_parts(context, upload_observer.uploaded_parts) - - context.ch_ctl.remove_freezed_data(backup_name, table) + return upload_observer @staticmethod def _validate_uploaded_parts(context: BackupContext, uploaded_parts: list) -> None: diff --git a/tests/integration/ch_backup.featureset b/tests/integration/ch_backup.featureset index ddcb56e8..19f299e1 100644 --- a/tests/integration/ch_backup.featureset +++ b/tests/integration/ch_backup.featureset @@ -12,7 +12,7 @@ features/cloud_storage.feature features/database_engines.feature features/ddl_dictionary.feature features/deduplication.feature -features/freeze_parallel.feature +features/backup_parallel.feature features/incremental_restore.feature features/metadata.feature features/min_interval.feature diff --git a/tests/integration/features/freeze_parallel.feature b/tests/integration/features/backup_parallel.feature similarity index 87% rename from tests/integration/features/freeze_parallel.feature rename to tests/integration/features/backup_parallel.feature index 7ab31028..98098333 100644 --- a/tests/integration/features/freeze_parallel.feature +++ b/tests/integration/features/backup_parallel.feature @@ -1,4 +1,4 @@ -Feature: Parallel freeze +Feature: Parallel freeze and backup Background: Given default configuration @@ -9,11 +9,11 @@ Feature: Parallel freeze And clickhouse on clickhouse01 has test schema with 5 databases and 10 tables And clickhouse01 has test clickhouse data test1 with 5 databases, 10 tables, 100 rows and 5 partitions - Scenario: Create backup with single freeze worker + Scenario: Create backup with single backup thread Given ch-backup configuration on clickhouse01 """ multiprocessing: - freeze_workers: 1 + backup_threads: 1 """ When we create clickhouse01 clickhouse backup Then we got the following backups on clickhouse01 @@ -22,7 +22,7 @@ Feature: Parallel freeze When we restore clickhouse backup #0 to clickhouse02 Then we got same clickhouse data at clickhouse01 clickhouse02 - Scenario: Create backup with default number of freeze workers + Scenario: Create backup with default number of backup threads When we create clickhouse01 clickhouse backup Then we got the following backups on clickhouse01 | num | state | data_count | link_count | title | diff --git a/tests/unit/test_backup_tables.py b/tests/unit/test_backup_tables.py index 2b32cb2b..ce1dfd60 100644 --- a/tests/unit/test_backup_tables.py +++ b/tests/unit/test_backup_tables.py @@ -29,7 +29,7 @@ def test_backup_table_skipping_if_metadata_updated_during_backup( context = BackupContext(DEFAULT_CONFIG) # type: ignore[arg-type] db = Database(db_name, "MergeTree", "/var/lib/clickhouse/metadata/db1.sql") multiprocessing_conf: Dict[str, int] = DEFAULT_CONFIG.get("multiprocessing") # type: ignore[assignment] - table_backup = TableBackup(multiprocessing_conf.get("freeze_workers", 1)) + table_backup = TableBackup(multiprocessing_conf.get("backup_threads", 1)) backup_meta = BackupMetadata( name="20181017T210300", path="ch_backup/20181017T210300",