From 4c3acbf9aeaf0e8cc1d5e5a2b947728ecca6a012 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Thu, 20 Jun 2024 01:43:19 +0300 Subject: [PATCH 01/24] Freeze and backup thread pools --- ch_backup/logic/table.py | 105 ++++++++++++++++++++++++++++++++------- 1 file changed, 87 insertions(+), 18 deletions(-) diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index d752bdc3..38a511a1 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -4,6 +4,7 @@ import os from collections import deque +from concurrent.futures import Future, ThreadPoolExecutor from dataclasses import dataclass from functools import partial from itertools import chain @@ -49,6 +50,36 @@ class TableBackup(BackupManager): Table backup class """ + def __init__(self): + self._freeze_executor: Optional[ThreadPoolExecutor] = None + self._backup_executor: Optional[ThreadPoolExecutor] = None + self._backup_futures: Optional[List[Future]] = None + + def _init_backup_executors(self): + self._freeze_executor = ThreadPoolExecutor(max_workers=1) + self._backup_executor = ThreadPoolExecutor(max_workers=1) + self._backup_futures = [] + + def _shutdown_backup_executors(self): + if self._freeze_executor: + self._freeze_executor.shutdown() + if self._backup_executor: + self._backup_executor.shutdown() + if self._backup_futures: + self._wait_backup_executors() + self._backup_futures.clear() + + def _wait_backup_executors(self): + if not self._backup_futures: + return + + for freeze_future in self._backup_futures: + backup_future = freeze_future.result(timeout=30) + if backup_future is not None: + backup_future.result(timeout=30) + + self._backup_futures.clear() + def backup( self, context: BackupContext, @@ -59,6 +90,8 @@ def backup( """ Backup tables metadata, MergeTree data and Cloud storage metadata. """ + self._init_backup_executors() + backup_name = context.backup_meta.get_sanitized_name() for db in databases: @@ -69,6 +102,9 @@ def backup( backup_name, schema_only, ) + + self._shutdown_backup_executors() + self._backup_cloud_storage_metadata(context) def _collect_local_metadata_mtime( @@ -118,15 +154,30 @@ def _backup( if table.name not in mtimes: continue - self._backup_table( - context, - db, - table, - backup_name, - schema_only, - mtimes, + logging.debug( + 'Adding "{}"."{}" to the freeze and backup queue', + table.database, + table.name, + ) + + assert ( + self._backup_futures is not None + and self._freeze_executor is not None + ) + self._backup_futures.append( + self._freeze_executor.submit( + self._backup_table, + context, + db, + table, + backup_name, + schema_only, + mtimes, + ) ) + self._wait_backup_executors() + context.backup_layout.upload_backup_metadata(context.backup_meta) @staticmethod @@ -303,15 +354,8 @@ def _backup_table( backup_name: str, schema_only: bool, mtimes: Dict[str, TableMetadataMtime], - ) -> None: - """ - Make backup of metadata and data of single table. - - Return backup metadata of successfully backuped table, otherwise None. - """ - logging.debug( - 'Performing table backup for "{}"."{}"', table.database, table.name - ) + ) -> Optional[Future]: + logging.debug('Trying to freeze "{}"."{}"', table.database, table.name) create_statement = self._load_create_statement_from_disk(table) if not create_statement: logging.warning( @@ -319,7 +363,7 @@ def _backup_table( db.name, table.name, ) - return + return None # Freeze only MergeTree tables if not schema_only and is_merge_tree(table.engine): @@ -334,8 +378,30 @@ def _backup_table( table.database, table.name, ) - return + return None + assert self._backup_executor is not None + return self._backup_executor.submit( + self._backup_table_after_freeze, + context, + db, + table, + backup_name, + schema_only, + mtimes, + create_statement, + ) + + def _backup_table_after_freeze( + self, + context: BackupContext, + db: Database, + table: Table, + backup_name: str, + schema_only: bool, + mtimes: Dict[str, TableMetadataMtime], + create_statement: bytes, + ) -> None: # Check if table metadata was updated new_mtime = self._get_mtime(table.metadata_path) if new_mtime is None or mtimes[table.name].mtime != new_mtime: @@ -347,6 +413,9 @@ def _backup_table( context.ch_ctl.remove_freezed_data() return + logging.debug( + 'Performing table backup for "{}"."{}"', table.database, table.name + ) # Add table metadata to backup metadata context.backup_meta.add_table( TableMetadata(table.database, table.name, table.engine, table.uuid) From faecbed898ba6207bd38802486a434874749abbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Thu, 20 Jun 2024 18:55:47 +0300 Subject: [PATCH 02/24] Remove freezed data for one table --- ch_backup/clickhouse/control.py | 20 ++++++++++++++------ ch_backup/logic/table.py | 8 +++++--- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/ch_backup/clickhouse/control.py b/ch_backup/clickhouse/control.py index 209f7d05..576a3676 100644 --- a/ch_backup/clickhouse/control.py +++ b/ch_backup/clickhouse/control.py @@ -474,15 +474,23 @@ def system_unfreeze(self, backup_name: str) -> None: query_sql = SYSTEM_UNFREEZE_SQL.format(backup_name=backup_name) self._ch_client.query(query_sql, timeout=self._unfreeze_timeout) - def remove_freezed_data(self) -> None: + def remove_freezed_data(self, backup_name: Optional[str] = None, table: Optional[Table] = None) -> None: """ Remove all freezed partitions from all local disks. """ - for disk in self._disks.values(): - if disk.type == "local": - shadow_path = os.path.join(disk.path, "shadow") - logging.debug("Removing shadow data: {}", shadow_path) - self._remove_shadow_data(shadow_path) + if backup_name and table: + for table_data_path, disk in table.paths_with_disks: + if disk.type == "local": + table_relative_path = os.path.relpath(table_data_path, disk.path) + shadow_path = os.path.join(disk.path, "shadow", backup_name, table_relative_path) + logging.debug("Removing shadow data: {}", shadow_path) + self._remove_shadow_data(shadow_path) + else: + for disk in self._disks.values(): + if disk.type == "local": + shadow_path = os.path.join(disk.path, "shadow") + logging.debug("Removing shadow data: {}", shadow_path) + self._remove_shadow_data(shadow_path) def remove_freezed_part(self, part: FrozenPart) -> None: """ diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 38a511a1..3a3bbda4 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -176,7 +176,9 @@ def _backup( ) ) - self._wait_backup_executors() + self._wait_backup_executors() + + context.ch_ctl.remove_freezed_data() context.backup_layout.upload_backup_metadata(context.backup_meta) @@ -410,7 +412,7 @@ def _backup_table_after_freeze( table.database, table.name, ) - context.ch_ctl.remove_freezed_data() + context.ch_ctl.remove_freezed_data(backup_name, table) return logging.debug( @@ -505,7 +507,7 @@ def deduplicate_parts_in_batch( self._validate_uploaded_parts(context, upload_observer.uploaded_parts) - context.ch_ctl.remove_freezed_data() + context.ch_ctl.remove_freezed_data(backup_name, table) @staticmethod def _validate_uploaded_parts(context: BackupContext, uploaded_parts: list) -> None: From 8807c8684576b200c8e343061c7a520f2853a3a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Thu, 20 Jun 2024 18:56:23 +0300 Subject: [PATCH 03/24] Freeze workers option in config --- ch_backup/ch_backup.py | 2 +- ch_backup/config.py | 2 ++ ch_backup/logic/table.py | 15 +++++++++++---- tests/unit/test_backup_tables.py | 8 +++++--- 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/ch_backup/ch_backup.py b/ch_backup/ch_backup.py index bec5b378..bdde8485 100644 --- a/ch_backup/ch_backup.py +++ b/ch_backup/ch_backup.py @@ -48,7 +48,7 @@ def __init__(self, config: Config) -> None: self._config = config self._access_backup_manager = AccessBackup() self._database_backup_manager = DatabaseBackup() - self._table_backup_manager = TableBackup() + self._table_backup_manager = TableBackup(self._config.get("multiprocessing").get("freeze_workers", 1)) self._udf_backup_manager = UDFBackup() self._nc_backup_manager = NamedCollectionsBackup() diff --git a/ch_backup/config.py b/ch_backup/config.py index 036d86b5..40484f67 100644 --- a/ch_backup/config.py +++ b/ch_backup/config.py @@ -156,6 +156,8 @@ def _as_seconds(t: str) -> int: "workers": 4, # The number of processes for parts restoring from S3 disks. "cloud_storage_restore_workers": 4, + # The number of processes for parallel freeze of tables + "freeze_workers": 4, }, "pipeline": { # Is asynchronous pipelines used (based on Pypeln library) diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 3a3bbda4..26ac3dec 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -50,13 +50,14 @@ class TableBackup(BackupManager): Table backup class """ - def __init__(self): + def __init__(self, freeze_workers: int = 1): self._freeze_executor: Optional[ThreadPoolExecutor] = None self._backup_executor: Optional[ThreadPoolExecutor] = None self._backup_futures: Optional[List[Future]] = None + self._freeze_workers: int = freeze_workers def _init_backup_executors(self): - self._freeze_executor = ThreadPoolExecutor(max_workers=1) + self._freeze_executor = ThreadPoolExecutor(max_workers=self._freeze_workers) self._backup_executor = ThreadPoolExecutor(max_workers=1) self._backup_futures = [] @@ -70,13 +71,16 @@ def _shutdown_backup_executors(self): self._backup_futures.clear() def _wait_backup_executors(self): + """ + Wait for freeze tasks to finish and return backup futures, when wait for backup tasks. + """ if not self._backup_futures: return for freeze_future in self._backup_futures: - backup_future = freeze_future.result(timeout=30) + backup_future = freeze_future.result() if backup_future is not None: - backup_future.result(timeout=30) + backup_future.result() self._backup_futures.clear() @@ -357,6 +361,9 @@ def _backup_table( schema_only: bool, mtimes: Dict[str, TableMetadataMtime], ) -> Optional[Future]: + """ + Freeze table and submit backup task to the backup executor. + """ logging.debug('Trying to freeze "{}"."{}"', table.database, table.name) create_statement = self._load_create_statement_from_disk(table) if not create_statement: diff --git a/tests/unit/test_backup_tables.py b/tests/unit/test_backup_tables.py index 50ce2304..2b32cb2b 100644 --- a/tests/unit/test_backup_tables.py +++ b/tests/unit/test_backup_tables.py @@ -1,4 +1,4 @@ -from typing import List +from typing import Dict, List from unittest.mock import Mock, patch import pytest @@ -28,7 +28,8 @@ def test_backup_table_skipping_if_metadata_updated_during_backup( # Prepare involved data objects context = BackupContext(DEFAULT_CONFIG) # type: ignore[arg-type] db = Database(db_name, "MergeTree", "/var/lib/clickhouse/metadata/db1.sql") - table_backup = TableBackup() + multiprocessing_conf: Dict[str, int] = DEFAULT_CONFIG.get("multiprocessing") # type: ignore[assignment] + table_backup = TableBackup(multiprocessing_conf.get("freeze_workers", 1)) backup_meta = BackupMetadata( name="20181017T210300", path="ch_backup/20181017T210300", @@ -69,4 +70,5 @@ def test_backup_table_skipping_if_metadata_updated_during_backup( table_backup.backup(context, [db], {db_name: [table_name]}, schema_only=False) assert len(context.backup_meta.get_tables(db_name)) == backups_expected - assert clickhouse_ctl_mock.remove_freezed_data.call_count == 1 + # One call after each table and one after database is backuped + assert clickhouse_ctl_mock.remove_freezed_data.call_count == 2 From 1e86b09006044d319359cd451f700d45a45e7248 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Sat, 22 Jun 2024 19:00:51 +0300 Subject: [PATCH 04/24] More flexibility of data generation for tests --- tests/integration/modules/clickhouse.py | 80 +++++++++++++++++-------- tests/integration/steps/clickhouse.py | 40 +++++++++++++ 2 files changed, 95 insertions(+), 25 deletions(-) diff --git a/tests/integration/modules/clickhouse.py b/tests/integration/modules/clickhouse.py index f0989dd9..4a43ffee 100644 --- a/tests/integration/modules/clickhouse.py +++ b/tests/integration/modules/clickhouse.py @@ -4,8 +4,8 @@ import logging from copy import copy -from datetime import datetime -from typing import Any, Sequence, Tuple, Union +from datetime import datetime, timedelta +from typing import Any, List, Sequence, Tuple, Union from urllib.parse import urljoin from pkg_resources import parse_version @@ -18,6 +18,7 @@ DB_COUNT = 2 TABLE_COUNT = 2 ROWS_COUNT = 3 +PARTITIONS_COUNT = 1 ACCESS_TYPES = [ ("users", "USER"), @@ -91,14 +92,18 @@ def ch_version_ge(self, comparing_version: str) -> bool: """ return parse_version(self.get_version()) >= parse_version(comparing_version) # type: ignore - def init_schema(self) -> None: + def init_schema( + self, + db_count: int = DB_COUNT, + table_count: int = TABLE_COUNT, + ) -> None: """ Create test schema. """ - for db_num in range(1, DB_COUNT + 1): + for db_num in range(1, db_count + 1): db_name = f"test_db_{db_num:02d}" self._query("POST", f"CREATE DATABASE IF NOT EXISTS {db_name}") - for table_num in range(1, TABLE_COUNT + 1): + for table_num in range(1, table_count + 1): table_name = f"test_table_{table_num:02d}" query = f""" CREATE TABLE IF NOT EXISTS `{db_name}`.`{table_name}` ( @@ -113,26 +118,36 @@ def init_schema(self) -> None: """ self._query("POST", query) - def init_data(self, mark: str) -> None: + def init_data( + self, + mark: str, + db_count: int = DB_COUNT, + table_count: int = TABLE_COUNT, + rows_count: int = ROWS_COUNT, + partitions_count: int = PARTITIONS_COUNT, + ) -> None: """ Fill test schema with data """ - for db_num in range(1, DB_COUNT + 1): + for db_num in range(1, db_count + 1): db_name = self._get_test_db_name(db_num) - for table_num in range(1, TABLE_COUNT + 1): - rows = [] + for table_num in range(1, table_count + 1): table_name = self._get_test_table_name(table_num) - for row_num in range(1, ROWS_COUNT + 1): - rows.append( - ", ".join(self._gen_record(row_num=row_num, str_prefix=mark)) - ) + rows = self._gen_rows( + rows_count=rows_count, + str_prefix=mark, + partitions_count=partitions_count, + ) self._query( "POST", - f"INSERT INTO {db_name}.{table_name} FORMAT CSV", + f"INSERT INTO `{db_name}`.`{table_name}` FORMAT CSV", data="\n".join(rows), ) + # Make all possible merges to make tests more determinated + self._query("POST", f"OPTIMIZE TABLE `{db_name}`.`{table_name}`") + def get_all_user_data(self) -> Tuple[int, dict]: """ Retrieve all user data. @@ -322,23 +337,38 @@ def _get_test_table_name(table_num: int) -> str: return f"test_table_{table_num:02d}" @staticmethod - def _gen_record(row_num=0, str_len=5, str_prefix=None): + def _gen_rows( + rows_count=ROWS_COUNT, + str_len=5, + str_prefix=None, + partitions_count=PARTITIONS_COUNT, + ): """ - Generate test record. + Generate test rows. """ + rows: List[str] = [] + if str_prefix is None: str_prefix = "" else: str_prefix = f"{str_prefix}_" - rand_str = generate_random_string(str_len) - + dates: List[datetime] = [] dt_now = datetime.utcnow() - row = ( - dt_now.strftime("%Y-%m-%d"), - dt_now.strftime("%Y-%m-%d %H:%M:%S"), - str(row_num), - f"{str_prefix}{rand_str}", - ) + # PARTITION BY date + for i in range(partitions_count): + date = dt_now + timedelta(days=i) + dates.append(date) + + for row_num in range(1, rows_count + 1): + rand_str = generate_random_string(str_len) + date = dates[row_num % partitions_count] + row = ( + date.strftime("%Y-%m-%d"), + date.strftime("%Y-%m-%d %H:%M:%S"), + str(row_num), + f"{str_prefix}{rand_str}", + ) + rows.append(", ".join(row)) - return row + return rows diff --git a/tests/integration/steps/clickhouse.py b/tests/integration/steps/clickhouse.py index 9cf9b245..91fffe64 100644 --- a/tests/integration/steps/clickhouse.py +++ b/tests/integration/steps/clickhouse.py @@ -30,6 +30,19 @@ def step_init_test_schema(context, node): ClickhouseClient(context, node).init_schema() +@given( + "clickhouse on {node:w} has test schema with {db_count:d} databases and {tables_count:d} tables" +) +@when( + "clickhouse on {node:w} has test schema with {db_count:d} databases and {tables_count:d} tables" +) +def step_init_test_schema_custom(context, node, db_count, tables_count): + """ + Load test schema to clickhouse. + """ + ClickhouseClient(context, node).init_schema(db_count, tables_count) + + @when('we put following info in "{path}" at {node:w}') def step_put_file(context, path, node): container = get_container(context, node) @@ -45,6 +58,33 @@ def step_fill_with_test_data(context, node, test_name): ClickhouseClient(context, node).init_data(mark=test_name) +@given( + "{node:w} has test clickhouse data {test_name:w} with {db_count:d} databases, {tables_count:d} tables, {rows_count:d} rows and {partitions_count:d} partitions" +) +@when( + "{node:w} has test clickhouse data {test_name:w} with {db_count:d} databases, {tables_count:d} tables, {rows_count:d} rows and {partitions_count:d} partitions" +) +def step_fill_with_test_data_custom( + context, + node, + test_name, + db_count, + tables_count, + rows_count, + partitions_count, +): + """ + Load test data to clickhouse. + """ + ClickhouseClient(context, node).init_data( + mark=test_name, + db_count=db_count, + table_count=tables_count, + rows_count=rows_count, + partitions_count=partitions_count, + ) + + @given("we execute query on {node:w}") @when("we execute query on {node:w}") def step_test_request(context, node): From 6b7182b74f80c073725b62bcd635d46ee12927d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Sat, 22 Jun 2024 19:01:06 +0300 Subject: [PATCH 05/24] Parallel freeze test --- tests/integration/ch_backup.featureset | 1 + .../features/freeze_parallel.feature | 31 +++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 tests/integration/features/freeze_parallel.feature diff --git a/tests/integration/ch_backup.featureset b/tests/integration/ch_backup.featureset index 163306d8..ddcb56e8 100644 --- a/tests/integration/ch_backup.featureset +++ b/tests/integration/ch_backup.featureset @@ -12,6 +12,7 @@ features/cloud_storage.feature features/database_engines.feature features/ddl_dictionary.feature features/deduplication.feature +features/freeze_parallel.feature features/incremental_restore.feature features/metadata.feature features/min_interval.feature diff --git a/tests/integration/features/freeze_parallel.feature b/tests/integration/features/freeze_parallel.feature new file mode 100644 index 00000000..7ab31028 --- /dev/null +++ b/tests/integration/features/freeze_parallel.feature @@ -0,0 +1,31 @@ +Feature: Parallel freeze + + Background: + Given default configuration + And a working s3 + And a working zookeeper on zookeeper01 + And a working clickhouse on clickhouse01 + And a working clickhouse on clickhouse02 + And clickhouse on clickhouse01 has test schema with 5 databases and 10 tables + And clickhouse01 has test clickhouse data test1 with 5 databases, 10 tables, 100 rows and 5 partitions + + Scenario: Create backup with single freeze worker + Given ch-backup configuration on clickhouse01 + """ + multiprocessing: + freeze_workers: 1 + """ + When we create clickhouse01 clickhouse backup + Then we got the following backups on clickhouse01 + | num | state | data_count | link_count | title | + | 0 | created | 250 | 0 | shared | + When we restore clickhouse backup #0 to clickhouse02 + Then we got same clickhouse data at clickhouse01 clickhouse02 + + Scenario: Create backup with default number of freeze workers + When we create clickhouse01 clickhouse backup + Then we got the following backups on clickhouse01 + | num | state | data_count | link_count | title | + | 0 | created | 250 | 0 | shared | + When we restore clickhouse backup #0 to clickhouse02 + Then we got same clickhouse data at clickhouse01 clickhouse02 From c45b386c6ffc541d339bf4218b7759894747b8b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Sat, 22 Jun 2024 19:17:10 +0300 Subject: [PATCH 06/24] Move executors to subclass --- ch_backup/ch_backup.py | 4 +- ch_backup/clickhouse/control.py | 8 +- ch_backup/logic/table.py | 129 ++++++++++++++++++++------------ 3 files changed, 89 insertions(+), 52 deletions(-) diff --git a/ch_backup/ch_backup.py b/ch_backup/ch_backup.py index bdde8485..f2f514fd 100644 --- a/ch_backup/ch_backup.py +++ b/ch_backup/ch_backup.py @@ -48,7 +48,9 @@ def __init__(self, config: Config) -> None: self._config = config self._access_backup_manager = AccessBackup() self._database_backup_manager = DatabaseBackup() - self._table_backup_manager = TableBackup(self._config.get("multiprocessing").get("freeze_workers", 1)) + self._table_backup_manager = TableBackup( + self._config.get("multiprocessing").get("freeze_workers", 1) + ) self._udf_backup_manager = UDFBackup() self._nc_backup_manager = NamedCollectionsBackup() diff --git a/ch_backup/clickhouse/control.py b/ch_backup/clickhouse/control.py index 576a3676..6f4f6816 100644 --- a/ch_backup/clickhouse/control.py +++ b/ch_backup/clickhouse/control.py @@ -474,7 +474,9 @@ def system_unfreeze(self, backup_name: str) -> None: query_sql = SYSTEM_UNFREEZE_SQL.format(backup_name=backup_name) self._ch_client.query(query_sql, timeout=self._unfreeze_timeout) - def remove_freezed_data(self, backup_name: Optional[str] = None, table: Optional[Table] = None) -> None: + def remove_freezed_data( + self, backup_name: Optional[str] = None, table: Optional[Table] = None + ) -> None: """ Remove all freezed partitions from all local disks. """ @@ -482,7 +484,9 @@ def remove_freezed_data(self, backup_name: Optional[str] = None, table: Optional for table_data_path, disk in table.paths_with_disks: if disk.type == "local": table_relative_path = os.path.relpath(table_data_path, disk.path) - shadow_path = os.path.join(disk.path, "shadow", backup_name, table_relative_path) + shadow_path = os.path.join( + disk.path, "shadow", backup_name, table_relative_path + ) logging.debug("Removing shadow data: {}", shadow_path) self._remove_shadow_data(shadow_path) else: diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 26ac3dec..9c4537d8 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -9,7 +9,7 @@ from functools import partial from itertools import chain from pathlib import Path -from typing import Dict, Iterable, List, Optional, Sequence, Tuple +from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple from ch_backup import logging from ch_backup.backup.deduplication import deduplicate_parts @@ -50,39 +50,77 @@ class TableBackup(BackupManager): Table backup class """ - def __init__(self, freeze_workers: int = 1): - self._freeze_executor: Optional[ThreadPoolExecutor] = None - self._backup_executor: Optional[ThreadPoolExecutor] = None - self._backup_futures: Optional[List[Future]] = None - self._freeze_workers: int = freeze_workers - - def _init_backup_executors(self): - self._freeze_executor = ThreadPoolExecutor(max_workers=self._freeze_workers) - self._backup_executor = ThreadPoolExecutor(max_workers=1) - self._backup_futures = [] - - def _shutdown_backup_executors(self): - if self._freeze_executor: - self._freeze_executor.shutdown() - if self._backup_executor: - self._backup_executor.shutdown() - if self._backup_futures: - self._wait_backup_executors() - self._backup_futures.clear() - - def _wait_backup_executors(self): + class BackupExecutor: """ - Wait for freeze tasks to finish and return backup futures, when wait for backup tasks. + Allows to freeze and backup tables in parallel. """ - if not self._backup_futures: - return - for freeze_future in self._backup_futures: - backup_future = freeze_future.result() - if backup_future is not None: - backup_future.result() + def __init__(self, freeze_workers): + self._freeze_executor: Optional[ThreadPoolExecutor] = None + self._backup_executor: Optional[ThreadPoolExecutor] = None + self._freeze_futures: Optional[List[Future]] = None + self._freeze_workers: int = freeze_workers + + def init(self): + """ + Init executors. + """ + self._freeze_executor = ThreadPoolExecutor(max_workers=self._freeze_workers) + self._backup_executor = ThreadPoolExecutor(max_workers=1) + self._freeze_futures = [] + + def shutdown(self): + """ + Shutdown executors to clean-up associated resources. + """ + if self._freeze_executor: + self._freeze_executor.shutdown() + if self._backup_executor: + self._backup_executor.shutdown() + if self._freeze_futures: + self.wait() + self._freeze_futures.clear() + + def wait(self): + """ + Wait for freeze tasks to finish and return backup futures, when wait for backup futures to finish. + """ + if not self._freeze_futures: + return + + for freeze_future in self._freeze_futures: + backup_future = freeze_future.result() + if backup_future is not None: + backup_future.result() + + self._freeze_futures.clear() + + def submit_freeze_task( + self, fn: Callable, /, *args: Any, **kwargs: Any + ) -> Future: + """ + Submit freeze task to the executor. + Task is expected to submit backup task to the backup executor as a result. + """ + assert ( + self._freeze_futures is not None and self._freeze_executor is not None + ) + result = self._freeze_executor.submit(fn, *args, **kwargs) + self._freeze_futures.append(result) + return result + + def submit_backup_task( + self, fn: Callable, /, *args: Any, **kwargs: Any + ) -> Future: + """ + Submit backup task to the executor. + This method is expected to be used inside freeze task. + """ + assert self._backup_executor is not None + return self._backup_executor.submit(fn, *args, **kwargs) - self._backup_futures.clear() + def __init__(self, freeze_workers: int = 1): + self._backup_executor = self.BackupExecutor(freeze_workers) def backup( self, @@ -94,7 +132,7 @@ def backup( """ Backup tables metadata, MergeTree data and Cloud storage metadata. """ - self._init_backup_executors() + self._backup_executor.init() backup_name = context.backup_meta.get_sanitized_name() @@ -107,7 +145,7 @@ def backup( schema_only, ) - self._shutdown_backup_executors() + self._backup_executor.shutdown() self._backup_cloud_storage_metadata(context) @@ -164,23 +202,17 @@ def _backup( table.name, ) - assert ( - self._backup_futures is not None - and self._freeze_executor is not None - ) - self._backup_futures.append( - self._freeze_executor.submit( - self._backup_table, - context, - db, - table, - backup_name, - schema_only, - mtimes, - ) + self._backup_executor.submit_freeze_task( + self._backup_table, + context, + db, + table, + backup_name, + schema_only, + mtimes, ) - self._wait_backup_executors() + self._backup_executor.wait() context.ch_ctl.remove_freezed_data() @@ -389,8 +421,7 @@ def _backup_table( ) return None - assert self._backup_executor is not None - return self._backup_executor.submit( + return self._backup_executor.submit_backup_task( self._backup_table_after_freeze, context, db, From ec20451cbc25cf8352c2909c6910de462846c0db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Sat, 22 Jun 2024 20:27:52 +0300 Subject: [PATCH 07/24] Partition by prefix to fix dedup tests --- tests/integration/modules/clickhouse.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/integration/modules/clickhouse.py b/tests/integration/modules/clickhouse.py index 4a43ffee..51480b44 100644 --- a/tests/integration/modules/clickhouse.py +++ b/tests/integration/modules/clickhouse.py @@ -110,10 +110,11 @@ def init_schema( date Date, datetime DateTime, int_num UInt32, + prefix String, str String ) ENGINE MergeTree - PARTITION BY date + PARTITION BY (date, prefix) ORDER BY int_num """ self._query("POST", query) @@ -367,7 +368,8 @@ def _gen_rows( date.strftime("%Y-%m-%d"), date.strftime("%Y-%m-%d %H:%M:%S"), str(row_num), - f"{str_prefix}{rand_str}", + f"{str_prefix}", + f"{rand_str}", ) rows.append(", ".join(row)) From 8b795aec264b023a99126a865ddbab60163f1759 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Mon, 24 Jun 2024 13:19:16 +0300 Subject: [PATCH 08/24] Fix codespell --- tests/integration/modules/clickhouse.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/modules/clickhouse.py b/tests/integration/modules/clickhouse.py index 51480b44..a37dd000 100644 --- a/tests/integration/modules/clickhouse.py +++ b/tests/integration/modules/clickhouse.py @@ -146,7 +146,7 @@ def init_data( data="\n".join(rows), ) - # Make all possible merges to make tests more determinated + # Make all possible merges to make tests more determined self._query("POST", f"OPTIMIZE TABLE `{db_name}`.`{table_name}`") def get_all_user_data(self) -> Tuple[int, dict]: From 4f8bfec38a115b6df56df553cb726e678d5a3649 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Mon, 24 Jun 2024 13:27:12 +0300 Subject: [PATCH 09/24] Fix for old python --- ch_backup/logic/table.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 9c4537d8..b8dc6d4e 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -95,9 +95,7 @@ def wait(self): self._freeze_futures.clear() - def submit_freeze_task( - self, fn: Callable, /, *args: Any, **kwargs: Any - ) -> Future: + def submit_freeze_task(self, fn: Callable, *args: Any, **kwargs: Any) -> Future: """ Submit freeze task to the executor. Task is expected to submit backup task to the backup executor as a result. @@ -109,9 +107,7 @@ def submit_freeze_task( self._freeze_futures.append(result) return result - def submit_backup_task( - self, fn: Callable, /, *args: Any, **kwargs: Any - ) -> Future: + def submit_backup_task(self, fn: Callable, *args: Any, **kwargs: Any) -> Future: """ Submit backup task to the executor. This method is expected to be used inside freeze task. From 75548702e87b6d7df7a88179ecf25155badc6428 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Tue, 25 Jun 2024 20:50:41 +0300 Subject: [PATCH 10/24] Remove backup executor and backup in the main thread --- ch_backup/logic/table.py | 128 +++++++++++---------------------------- 1 file changed, 34 insertions(+), 94 deletions(-) diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index b8dc6d4e..fb060af2 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -4,12 +4,12 @@ import os from collections import deque -from concurrent.futures import Future, ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from functools import partial from itertools import chain from pathlib import Path -from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple +from typing import Callable, Dict, Iterable, List, Optional, Sequence, Tuple from ch_backup import logging from ch_backup.backup.deduplication import deduplicate_parts @@ -50,73 +50,8 @@ class TableBackup(BackupManager): Table backup class """ - class BackupExecutor: - """ - Allows to freeze and backup tables in parallel. - """ - - def __init__(self, freeze_workers): - self._freeze_executor: Optional[ThreadPoolExecutor] = None - self._backup_executor: Optional[ThreadPoolExecutor] = None - self._freeze_futures: Optional[List[Future]] = None - self._freeze_workers: int = freeze_workers - - def init(self): - """ - Init executors. - """ - self._freeze_executor = ThreadPoolExecutor(max_workers=self._freeze_workers) - self._backup_executor = ThreadPoolExecutor(max_workers=1) - self._freeze_futures = [] - - def shutdown(self): - """ - Shutdown executors to clean-up associated resources. - """ - if self._freeze_executor: - self._freeze_executor.shutdown() - if self._backup_executor: - self._backup_executor.shutdown() - if self._freeze_futures: - self.wait() - self._freeze_futures.clear() - - def wait(self): - """ - Wait for freeze tasks to finish and return backup futures, when wait for backup futures to finish. - """ - if not self._freeze_futures: - return - - for freeze_future in self._freeze_futures: - backup_future = freeze_future.result() - if backup_future is not None: - backup_future.result() - - self._freeze_futures.clear() - - def submit_freeze_task(self, fn: Callable, *args: Any, **kwargs: Any) -> Future: - """ - Submit freeze task to the executor. - Task is expected to submit backup task to the backup executor as a result. - """ - assert ( - self._freeze_futures is not None and self._freeze_executor is not None - ) - result = self._freeze_executor.submit(fn, *args, **kwargs) - self._freeze_futures.append(result) - return result - - def submit_backup_task(self, fn: Callable, *args: Any, **kwargs: Any) -> Future: - """ - Submit backup task to the executor. - This method is expected to be used inside freeze task. - """ - assert self._backup_executor is not None - return self._backup_executor.submit(fn, *args, **kwargs) - def __init__(self, freeze_workers: int = 1): - self._backup_executor = self.BackupExecutor(freeze_workers) + self._freeze_workers = freeze_workers def backup( self, @@ -128,7 +63,6 @@ def backup( """ Backup tables metadata, MergeTree data and Cloud storage metadata. """ - self._backup_executor.init() backup_name = context.backup_meta.get_sanitized_name() @@ -141,8 +75,6 @@ def backup( schema_only, ) - self._backup_executor.shutdown() - self._backup_cloud_storage_metadata(context) def _collect_local_metadata_mtime( @@ -188,29 +120,37 @@ def _backup( # See https://en.wikipedia.org/wiki/Optimistic_concurrency_control mtimes = self._collect_local_metadata_mtime(context, db, tables) - for table in context.ch_ctl.get_tables(db.name, tables): - if table.name not in mtimes: - continue + with ThreadPoolExecutor(max_workers=self._freeze_workers) as freeze_executor: + freeze_futures = [] - logging.debug( - 'Adding "{}"."{}" to the freeze and backup queue', - table.database, - table.name, - ) + for table in context.ch_ctl.get_tables(db.name, tables): + if table.name not in mtimes: + continue - self._backup_executor.submit_freeze_task( - self._backup_table, - context, - db, - table, - backup_name, - schema_only, - mtimes, - ) + logging.debug( + 'Adding "{}"."{}" to the freeze and backup queue', + table.database, + table.name, + ) + + freeze_futures.append( + freeze_executor.submit( + self._freeze_table, + context, + db, + table, + backup_name, + schema_only, + mtimes, + ) + ) - self._backup_executor.wait() + for freeze_future in as_completed(freeze_futures): + backup_freezed_table = freeze_future.result() + if backup_freezed_table is not None: + backup_freezed_table() - context.ch_ctl.remove_freezed_data() + context.ch_ctl.remove_freezed_data() context.backup_layout.upload_backup_metadata(context.backup_meta) @@ -380,7 +320,7 @@ def _load_create_statement_from_disk(table: Table) -> Optional[bytes]: ) return None - def _backup_table( + def _freeze_table( self, context: BackupContext, db: Database, @@ -388,9 +328,9 @@ def _backup_table( backup_name: str, schema_only: bool, mtimes: Dict[str, TableMetadataMtime], - ) -> Optional[Future]: + ) -> Optional[Callable]: """ - Freeze table and submit backup task to the backup executor. + Freeze table and return function which backups freezed table. """ logging.debug('Trying to freeze "{}"."{}"', table.database, table.name) create_statement = self._load_create_statement_from_disk(table) @@ -417,7 +357,7 @@ def _backup_table( ) return None - return self._backup_executor.submit_backup_task( + return partial( self._backup_table_after_freeze, context, db, From 06cb09740c0a70035fdb937a3e6b6f024e6ba08c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Tue, 25 Jun 2024 20:52:10 +0300 Subject: [PATCH 11/24] Fix black --- ch_backup/logic/table.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index fb060af2..03c04b5f 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -120,7 +120,9 @@ def _backup( # See https://en.wikipedia.org/wiki/Optimistic_concurrency_control mtimes = self._collect_local_metadata_mtime(context, db, tables) - with ThreadPoolExecutor(max_workers=self._freeze_workers) as freeze_executor: + with ThreadPoolExecutor( + max_workers=self._freeze_workers + ) as freeze_executor: freeze_futures = [] for table in context.ch_ctl.get_tables(db.name, tables): From 95bc77b02bc2ecc00c91fd30cee7c1cbaa671063 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Wed, 26 Jun 2024 02:34:45 +0300 Subject: [PATCH 12/24] Fix race condition with shadow/increment.txt --- ch_backup/clickhouse/control.py | 21 +++++++++++++++++++++ ch_backup/logic/table.py | 4 ++++ ch_backup/util.py | 7 +++++++ 3 files changed, 32 insertions(+) diff --git a/ch_backup/clickhouse/control.py b/ch_backup/clickhouse/control.py index 6f4f6816..10714154 100644 --- a/ch_backup/clickhouse/control.py +++ b/ch_backup/clickhouse/control.py @@ -24,6 +24,7 @@ from ch_backup.exceptions import ClickhouseBackupError from ch_backup.util import ( chown_dir_contents, + chown_file, escape, list_dir_files, retry, @@ -788,6 +789,26 @@ def chown_dir(self, dir_path: str) -> None: need_recursion, ) + def create_shadow_increment(self) -> None: + """ + Create shadow/increment.txt to fix race condition with parallel freeze. + Must be used before freezing more than one table at once. + """ + default_shadow_path = Path(self._root_data_path) / "shadow" + increment_path = default_shadow_path / "increment.txt" + if os.path.exists(increment_path): + return + if not os.path.exists(default_shadow_path): + os.mkdir(default_shadow_path) + self.chown_dir(str(default_shadow_path)) + with open(increment_path, "w", encoding="utf-8") as file: + file.write("0") + chown_file( + self._ch_ctl_config["user"], + self._ch_ctl_config["group"], + str(increment_path), + ) + @retry(OSError) def _remove_shadow_data(self, path: str) -> None: if path.find("/shadow") == -1: diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 03c04b5f..0e6f07e2 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -120,6 +120,10 @@ def _backup( # See https://en.wikipedia.org/wiki/Optimistic_concurrency_control mtimes = self._collect_local_metadata_mtime(context, db, tables) + # Create shadow/increment.txt if not exists manually to avoid + # race condition with parallel freeze + context.ch_ctl.create_shadow_increment() + with ThreadPoolExecutor( max_workers=self._freeze_workers ) as freeze_executor: diff --git a/ch_backup/util.py b/ch_backup/util.py index 8070781a..1b23bb71 100644 --- a/ch_backup/util.py +++ b/ch_backup/util.py @@ -78,6 +78,13 @@ def chown_dir_contents( shutil.chown(os.path.join(dir_path, path), user, group) +def chown_file(user: str, group: str, file_path: str) -> None: + """ + Change directory user/group + """ + shutil.chown(file_path, user, group) + + def list_dir_files(dir_path: str) -> List[str]: """ Returns paths of all files of directory (recursively), relative to its path From 0214f1ef7e18c76437d45d7e023b607b5478312c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Tue, 2 Jul 2024 02:18:06 +0300 Subject: [PATCH 13/24] Parallel backup --- ch_backup/backup/metadata/backup_metadata.py | 27 ++-- ch_backup/ch_backup.py | 2 +- ch_backup/clickhouse/control.py | 25 ++-- ch_backup/config.py | 4 +- ch_backup/logic/table.py | 136 ++++++++++-------- tests/integration/ch_backup.featureset | 2 +- ...rallel.feature => backup_parallel.feature} | 8 +- tests/unit/test_backup_tables.py | 2 +- 8 files changed, 112 insertions(+), 94 deletions(-) rename tests/integration/features/{freeze_parallel.feature => backup_parallel.feature} (87%) diff --git a/ch_backup/backup/metadata/backup_metadata.py b/ch_backup/backup/metadata/backup_metadata.py index 546ec6da..a686345a 100644 --- a/ch_backup/backup/metadata/backup_metadata.py +++ b/ch_backup/backup/metadata/backup_metadata.py @@ -6,6 +6,7 @@ import socket from datetime import datetime, timezone from enum import Enum +from threading import Lock from typing import Any, Dict, List, Optional, Sequence from ch_backup.backup.metadata.access_control_metadata import AccessControlMetadata @@ -70,6 +71,8 @@ def __init__( self._user_defined_functions: List[str] = [] self._named_collections: List[str] = [] + self._lock = Lock() + def __str__(self) -> str: return self.dump_json() @@ -264,16 +267,17 @@ def add_table(self, table: TableMetadata) -> None: """ Add table to backup metadata. """ - tables = self._databases[table.database]["tables"] + with self._lock: + tables = self._databases[table.database]["tables"] - assert table.name not in tables + assert table.name not in tables - tables[table.name] = table.raw_metadata + tables[table.name] = table.raw_metadata - for part in table.get_parts(): - self.size += part.size - if not part.link: - self.real_size += part.size + for part in table.get_parts(): + self.size += part.size + if not part.link: + self.real_size += part.size def add_udf(self, udf_name: str) -> None: """ @@ -328,11 +332,12 @@ def add_part(self, part: PartMetadata) -> None: """ Add data part to backup metadata. """ - self.get_table(part.database, part.table).add_part(part) + with self._lock: + self.get_table(part.database, part.table).add_part(part) - self.size += part.size - if not part.link: - self.real_size += part.size + self.size += part.size + if not part.link: + self.real_size += part.size def remove_parts(self, table: TableMetadata, parts: List[PartMetadata]) -> None: """ diff --git a/ch_backup/ch_backup.py b/ch_backup/ch_backup.py index f2f514fd..ff745b85 100644 --- a/ch_backup/ch_backup.py +++ b/ch_backup/ch_backup.py @@ -49,7 +49,7 @@ def __init__(self, config: Config) -> None: self._access_backup_manager = AccessBackup() self._database_backup_manager = DatabaseBackup() self._table_backup_manager = TableBackup( - self._config.get("multiprocessing").get("freeze_workers", 1) + self._config.get("multiprocessing").get("backup_threads", 1) ) self._udf_backup_manager = UDFBackup() self._nc_backup_manager = NamedCollectionsBackup() diff --git a/ch_backup/clickhouse/control.py b/ch_backup/clickhouse/control.py index 10714154..34a8b422 100644 --- a/ch_backup/clickhouse/control.py +++ b/ch_backup/clickhouse/control.py @@ -210,9 +210,12 @@ CREATE TABLE IF NOT EXISTS `{system_db}`._deduplication_info_current ( name String, checksum String, + database String, + table String, ) ENGINE = MergeTree() ORDER BY (name, checksum) + PARTITION BY (database, table) """ ) @@ -954,12 +957,13 @@ def create_deduplication_table(self): system_db=escape(self._backup_config["system_database"]) ) ) - self._ch_client.query( - TRUNCATE_TABLE_IF_EXISTS_SQL.format( - db_name=escape(self._backup_config["system_database"]), - table_name="_deduplication_info", + for table_name in ["_deduplication_info", "_deduplication_info_current"]: + self._ch_client.query( + TRUNCATE_TABLE_IF_EXISTS_SQL.format( + db_name=escape(self._backup_config["system_database"]), + table_name=table_name, + ) ) - ) self._ch_client.query( CREATE_IF_NOT_EXISTS_DEDUP_TABLE_SQL.format( system_db=escape(self._backup_config["system_database"]) @@ -985,19 +989,16 @@ def get_deduplication_info( """ Get deduplication info for given frozen parts of a table """ - self._ch_client.query( - TRUNCATE_TABLE_IF_EXISTS_SQL.format( - db_name=escape(self._backup_config["system_database"]), - table_name="_deduplication_info_current", - ) - ) self._ch_client.query( CREATE_IF_NOT_EXISTS_DEDUP_TABLE_CURRENT_SQL.format( system_db=escape(self._backup_config["system_database"]) ) ) - batch = [f"('{part.name}','{part.checksum}')" for part in frozen_parts.values()] + batch = [ + f"('{part.name}','{part.checksum}','{database}','{table}')" + for part in frozen_parts.values() + ] self._ch_client.query( INSERT_DEDUP_INFO_BATCH_SQL.format( system_db=escape(self._backup_config["system_database"]), diff --git a/ch_backup/config.py b/ch_backup/config.py index 40484f67..4e4c3eaa 100644 --- a/ch_backup/config.py +++ b/ch_backup/config.py @@ -156,8 +156,8 @@ def _as_seconds(t: str) -> int: "workers": 4, # The number of processes for parts restoring from S3 disks. "cloud_storage_restore_workers": 4, - # The number of processes for parallel freeze of tables - "freeze_workers": 4, + # The number of threads for parallel freeze and backup of tables + "backup_threads": 4, }, "pipeline": { # Is asynchronous pipelines used (based on Pypeln library) diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 0e6f07e2..4bd97578 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -4,12 +4,12 @@ import os from collections import deque -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import Future, ThreadPoolExecutor, as_completed from dataclasses import dataclass from functools import partial from itertools import chain from pathlib import Path -from typing import Callable, Dict, Iterable, List, Optional, Sequence, Tuple +from typing import Dict, Iterable, List, Optional, Sequence, Tuple from ch_backup import logging from ch_backup.backup.deduplication import deduplicate_parts @@ -50,8 +50,8 @@ class TableBackup(BackupManager): Table backup class """ - def __init__(self, freeze_workers: int = 1): - self._freeze_workers = freeze_workers + def __init__(self, backup_threads: int = 1): + self._backup_threads = backup_threads def backup( self, @@ -125,9 +125,10 @@ def _backup( context.ch_ctl.create_shadow_increment() with ThreadPoolExecutor( - max_workers=self._freeze_workers - ) as freeze_executor: - freeze_futures = [] + max_workers=self._backup_threads + ) as backup_executor: + backup_futures: List[Future] = [] + upload_observers: List[UploadPartObserver] = [] for table in context.ch_ctl.get_tables(db.name, tables): if table.name not in mtimes: @@ -139,9 +140,9 @@ def _backup( table.name, ) - freeze_futures.append( - freeze_executor.submit( - self._freeze_table, + backup_futures.append( + backup_executor.submit( + self._backup_table, context, db, table, @@ -151,15 +152,51 @@ def _backup( ) ) - for freeze_future in as_completed(freeze_futures): - backup_freezed_table = freeze_future.result() - if backup_freezed_table is not None: - backup_freezed_table() + for backup_future in as_completed(backup_futures): + upload_observer = backup_future.result() + if upload_observer: + upload_observers.append(upload_observer) + + context.backup_layout.wait() + + logging.debug( + 'All tables from "{}" are processed, validating uploaded parts.', + db.name, + ) + for upload_observer in upload_observers: + self._validate_uploaded_parts( + context, upload_observer.uploaded_parts + ) context.ch_ctl.remove_freezed_data() context.backup_layout.upload_backup_metadata(context.backup_meta) + def _backup_table( + self, + context: BackupContext, + db: Database, + table: Table, + backup_name: str, + schema_only: bool, + mtimes: Dict[str, TableMetadataMtime], + ) -> Optional[UploadPartObserver]: + create_statement = self._load_create_statement_from_disk(table) + if not create_statement: + logging.warning( + 'Skipping table backup for "{}"."{}". Local metadata is empty or absent', + db.name, + table.name, + ) + return None + # Freeze only MergeTree tables + if not schema_only and is_merge_tree(table.engine): + if not self._freeze_table(context, table, backup_name): + return None + return self._backup_table_after_freeze( + context, db, table, backup_name, schema_only, mtimes, create_statement + ) + @staticmethod def _backup_cloud_storage_metadata(context: BackupContext) -> None: """ @@ -329,50 +366,27 @@ def _load_create_statement_from_disk(table: Table) -> Optional[bytes]: def _freeze_table( self, context: BackupContext, - db: Database, table: Table, backup_name: str, - schema_only: bool, - mtimes: Dict[str, TableMetadataMtime], - ) -> Optional[Callable]: + ) -> bool: """ - Freeze table and return function which backups freezed table. + Freeze table. """ logging.debug('Trying to freeze "{}"."{}"', table.database, table.name) - create_statement = self._load_create_statement_from_disk(table) - if not create_statement: + + try: + context.ch_ctl.freeze_table(backup_name, table) + except ClickhouseError: + if context.ch_ctl.does_table_exist(table.database, table.name): + raise + logging.warning( - 'Skipping table backup for "{}"."{}". Local metadata is empty or absent', - db.name, + 'Table "{}"."{}" was removed by a user during backup', + table.database, table.name, ) - return None - - # Freeze only MergeTree tables - if not schema_only and is_merge_tree(table.engine): - try: - context.ch_ctl.freeze_table(backup_name, table) - except ClickhouseError: - if context.ch_ctl.does_table_exist(table.database, table.name): - raise - - logging.warning( - 'Table "{}"."{}" was removed by a user during backup', - table.database, - table.name, - ) - return None - - return partial( - self._backup_table_after_freeze, - context, - db, - table, - backup_name, - schema_only, - mtimes, - create_statement, - ) + return False + return True def _backup_table_after_freeze( self, @@ -383,7 +397,7 @@ def _backup_table_after_freeze( schema_only: bool, mtimes: Dict[str, TableMetadataMtime], create_statement: bytes, - ) -> None: + ) -> Optional[UploadPartObserver]: # Check if table metadata was updated new_mtime = self._get_mtime(table.metadata_path) if new_mtime is None or mtimes[table.name].mtime != new_mtime: @@ -392,8 +406,7 @@ def _backup_table_after_freeze( table.database, table.name, ) - context.ch_ctl.remove_freezed_data(backup_name, table) - return + return None logging.debug( 'Performing table backup for "{}"."{}"', table.database, table.name @@ -408,14 +421,15 @@ def _backup_table_after_freeze( ) # Backup table data if not schema_only: - self._backup_frozen_table_data(context, table, backup_name) + return self._backup_frozen_table_data(context, table, backup_name) + return None def _backup_frozen_table_data( self, context: BackupContext, table: Table, backup_name: str, - ) -> None: + ) -> Optional[UploadPartObserver]: """ Backup table with data opposed to schema only. """ @@ -426,7 +440,9 @@ def deduplicate_parts_in_batch( frozen_parts: Dict[str, FrozenPart], ) -> None: logging.debug( - "Working on deduplication of {} frozen parts", len(frozen_parts) + 'Working on deduplication of {} frozen parts of "{}"', + len(frozen_parts), + table.name, ) deduplicated_parts = deduplicate_parts( context, table.database, table.name, frozen_parts @@ -458,7 +474,7 @@ def deduplicate_parts_in_batch( table.database, table.name, ) - return + return None logging.debug('Uploading table data for "{}"."{}"', table.database, table.name) @@ -483,11 +499,7 @@ def deduplicate_parts_in_batch( if frozen_parts_batch: deduplicate_parts_in_batch(context, upload_observer, frozen_parts_batch) - context.backup_layout.wait() - - self._validate_uploaded_parts(context, upload_observer.uploaded_parts) - - context.ch_ctl.remove_freezed_data(backup_name, table) + return upload_observer @staticmethod def _validate_uploaded_parts(context: BackupContext, uploaded_parts: list) -> None: diff --git a/tests/integration/ch_backup.featureset b/tests/integration/ch_backup.featureset index ddcb56e8..19f299e1 100644 --- a/tests/integration/ch_backup.featureset +++ b/tests/integration/ch_backup.featureset @@ -12,7 +12,7 @@ features/cloud_storage.feature features/database_engines.feature features/ddl_dictionary.feature features/deduplication.feature -features/freeze_parallel.feature +features/backup_parallel.feature features/incremental_restore.feature features/metadata.feature features/min_interval.feature diff --git a/tests/integration/features/freeze_parallel.feature b/tests/integration/features/backup_parallel.feature similarity index 87% rename from tests/integration/features/freeze_parallel.feature rename to tests/integration/features/backup_parallel.feature index 7ab31028..98098333 100644 --- a/tests/integration/features/freeze_parallel.feature +++ b/tests/integration/features/backup_parallel.feature @@ -1,4 +1,4 @@ -Feature: Parallel freeze +Feature: Parallel freeze and backup Background: Given default configuration @@ -9,11 +9,11 @@ Feature: Parallel freeze And clickhouse on clickhouse01 has test schema with 5 databases and 10 tables And clickhouse01 has test clickhouse data test1 with 5 databases, 10 tables, 100 rows and 5 partitions - Scenario: Create backup with single freeze worker + Scenario: Create backup with single backup thread Given ch-backup configuration on clickhouse01 """ multiprocessing: - freeze_workers: 1 + backup_threads: 1 """ When we create clickhouse01 clickhouse backup Then we got the following backups on clickhouse01 @@ -22,7 +22,7 @@ Feature: Parallel freeze When we restore clickhouse backup #0 to clickhouse02 Then we got same clickhouse data at clickhouse01 clickhouse02 - Scenario: Create backup with default number of freeze workers + Scenario: Create backup with default number of backup threads When we create clickhouse01 clickhouse backup Then we got the following backups on clickhouse01 | num | state | data_count | link_count | title | diff --git a/tests/unit/test_backup_tables.py b/tests/unit/test_backup_tables.py index 2b32cb2b..ce1dfd60 100644 --- a/tests/unit/test_backup_tables.py +++ b/tests/unit/test_backup_tables.py @@ -29,7 +29,7 @@ def test_backup_table_skipping_if_metadata_updated_during_backup( context = BackupContext(DEFAULT_CONFIG) # type: ignore[arg-type] db = Database(db_name, "MergeTree", "/var/lib/clickhouse/metadata/db1.sql") multiprocessing_conf: Dict[str, int] = DEFAULT_CONFIG.get("multiprocessing") # type: ignore[assignment] - table_backup = TableBackup(multiprocessing_conf.get("freeze_workers", 1)) + table_backup = TableBackup(multiprocessing_conf.get("backup_threads", 1)) backup_meta = BackupMetadata( name="20181017T210300", path="ch_backup/20181017T210300", From 69331630421c1f62bc00326e56ca6acacbd02f1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Tue, 2 Jul 2024 02:25:37 +0300 Subject: [PATCH 14/24] Fix unit test - can't pickle lock --- tests/unit/test_backup_tables.py | 2 +- tests/unit/test_upload_part_observer.py | 22 ++++++++++++---------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/tests/unit/test_backup_tables.py b/tests/unit/test_backup_tables.py index ce1dfd60..4c69f309 100644 --- a/tests/unit/test_backup_tables.py +++ b/tests/unit/test_backup_tables.py @@ -71,4 +71,4 @@ def test_backup_table_skipping_if_metadata_updated_during_backup( assert len(context.backup_meta.get_tables(db_name)) == backups_expected # One call after each table and one after database is backuped - assert clickhouse_ctl_mock.remove_freezed_data.call_count == 2 + assert clickhouse_ctl_mock.remove_freezed_data.call_count == 1 diff --git a/tests/unit/test_upload_part_observer.py b/tests/unit/test_upload_part_observer.py index f5f928a1..d2aa0aa2 100644 --- a/tests/unit/test_upload_part_observer.py +++ b/tests/unit/test_upload_part_observer.py @@ -1,4 +1,3 @@ -import copy from typing import List from unittest.mock import Mock, patch @@ -16,17 +15,20 @@ TABLE_NAME = "test_table" ENGINE = "MergeTree" BACKUP_NAME = "TestBackup" -BACKUP_META = BackupMetadata( - name=BACKUP_NAME, - path=f"ch_backup/{BACKUP_NAME}", - version="1.0.100", - ch_version="19.1.16", - time_format="%Y-%m-%dT%H:%M:%S%Z", - hostname="clickhouse01.test_net_711", -) DB = Database(DB_NAME, ENGINE, f"/var/lib/clickhouse/metadata/{DB_NAME}.sql") +def create_backup_meta() -> BackupMetadata: + return BackupMetadata( + name=BACKUP_NAME, + path=f"ch_backup/{BACKUP_NAME}", + version="1.0.100", + ch_version="19.1.16", + time_format="%Y-%m-%dT%H:%M:%S%Z", + hostname="clickhouse01.test_net_711", + ) + + @parametrize( { "id": "One part before interval", @@ -82,7 +84,7 @@ def test_observer( ) -> None: config = {"backup": {"update_metadata_interval": interval}} - backup_meta = copy.deepcopy(BACKUP_META) + backup_meta = create_backup_meta() backup_meta.add_database(DB) context = BackupContext(config) # type: ignore[arg-type] From d0a25c9a7987c886367be25f192fbdba13964949 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Mon, 8 Jul 2024 18:56:39 +0300 Subject: [PATCH 15/24] Revert "Fix unit test - can't pickle lock" This reverts commit 69331630421c1f62bc00326e56ca6acacbd02f1a. --- tests/unit/test_backup_tables.py | 2 +- tests/unit/test_upload_part_observer.py | 22 ++++++++++------------ 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/tests/unit/test_backup_tables.py b/tests/unit/test_backup_tables.py index 4c69f309..ce1dfd60 100644 --- a/tests/unit/test_backup_tables.py +++ b/tests/unit/test_backup_tables.py @@ -71,4 +71,4 @@ def test_backup_table_skipping_if_metadata_updated_during_backup( assert len(context.backup_meta.get_tables(db_name)) == backups_expected # One call after each table and one after database is backuped - assert clickhouse_ctl_mock.remove_freezed_data.call_count == 1 + assert clickhouse_ctl_mock.remove_freezed_data.call_count == 2 diff --git a/tests/unit/test_upload_part_observer.py b/tests/unit/test_upload_part_observer.py index d2aa0aa2..f5f928a1 100644 --- a/tests/unit/test_upload_part_observer.py +++ b/tests/unit/test_upload_part_observer.py @@ -1,3 +1,4 @@ +import copy from typing import List from unittest.mock import Mock, patch @@ -15,20 +16,17 @@ TABLE_NAME = "test_table" ENGINE = "MergeTree" BACKUP_NAME = "TestBackup" +BACKUP_META = BackupMetadata( + name=BACKUP_NAME, + path=f"ch_backup/{BACKUP_NAME}", + version="1.0.100", + ch_version="19.1.16", + time_format="%Y-%m-%dT%H:%M:%S%Z", + hostname="clickhouse01.test_net_711", +) DB = Database(DB_NAME, ENGINE, f"/var/lib/clickhouse/metadata/{DB_NAME}.sql") -def create_backup_meta() -> BackupMetadata: - return BackupMetadata( - name=BACKUP_NAME, - path=f"ch_backup/{BACKUP_NAME}", - version="1.0.100", - ch_version="19.1.16", - time_format="%Y-%m-%dT%H:%M:%S%Z", - hostname="clickhouse01.test_net_711", - ) - - @parametrize( { "id": "One part before interval", @@ -84,7 +82,7 @@ def test_observer( ) -> None: config = {"backup": {"update_metadata_interval": interval}} - backup_meta = create_backup_meta() + backup_meta = copy.deepcopy(BACKUP_META) backup_meta.add_database(DB) context = BackupContext(config) # type: ignore[arg-type] From 827a66dccbb871da3e4e0c942deb2149b8c94833 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Mon, 8 Jul 2024 18:56:45 +0300 Subject: [PATCH 16/24] Revert "Parallel backup" This reverts commit 0214f1ef7e18c76437d45d7e023b607b5478312c. --- ch_backup/backup/metadata/backup_metadata.py | 27 ++-- ch_backup/ch_backup.py | 2 +- ch_backup/clickhouse/control.py | 25 ++-- ch_backup/config.py | 4 +- ch_backup/logic/table.py | 136 ++++++++---------- tests/integration/ch_backup.featureset | 2 +- ...rallel.feature => freeze_parallel.feature} | 8 +- tests/unit/test_backup_tables.py | 2 +- 8 files changed, 94 insertions(+), 112 deletions(-) rename tests/integration/features/{backup_parallel.feature => freeze_parallel.feature} (87%) diff --git a/ch_backup/backup/metadata/backup_metadata.py b/ch_backup/backup/metadata/backup_metadata.py index a686345a..546ec6da 100644 --- a/ch_backup/backup/metadata/backup_metadata.py +++ b/ch_backup/backup/metadata/backup_metadata.py @@ -6,7 +6,6 @@ import socket from datetime import datetime, timezone from enum import Enum -from threading import Lock from typing import Any, Dict, List, Optional, Sequence from ch_backup.backup.metadata.access_control_metadata import AccessControlMetadata @@ -71,8 +70,6 @@ def __init__( self._user_defined_functions: List[str] = [] self._named_collections: List[str] = [] - self._lock = Lock() - def __str__(self) -> str: return self.dump_json() @@ -267,17 +264,16 @@ def add_table(self, table: TableMetadata) -> None: """ Add table to backup metadata. """ - with self._lock: - tables = self._databases[table.database]["tables"] + tables = self._databases[table.database]["tables"] - assert table.name not in tables + assert table.name not in tables - tables[table.name] = table.raw_metadata + tables[table.name] = table.raw_metadata - for part in table.get_parts(): - self.size += part.size - if not part.link: - self.real_size += part.size + for part in table.get_parts(): + self.size += part.size + if not part.link: + self.real_size += part.size def add_udf(self, udf_name: str) -> None: """ @@ -332,12 +328,11 @@ def add_part(self, part: PartMetadata) -> None: """ Add data part to backup metadata. """ - with self._lock: - self.get_table(part.database, part.table).add_part(part) + self.get_table(part.database, part.table).add_part(part) - self.size += part.size - if not part.link: - self.real_size += part.size + self.size += part.size + if not part.link: + self.real_size += part.size def remove_parts(self, table: TableMetadata, parts: List[PartMetadata]) -> None: """ diff --git a/ch_backup/ch_backup.py b/ch_backup/ch_backup.py index ff745b85..f2f514fd 100644 --- a/ch_backup/ch_backup.py +++ b/ch_backup/ch_backup.py @@ -49,7 +49,7 @@ def __init__(self, config: Config) -> None: self._access_backup_manager = AccessBackup() self._database_backup_manager = DatabaseBackup() self._table_backup_manager = TableBackup( - self._config.get("multiprocessing").get("backup_threads", 1) + self._config.get("multiprocessing").get("freeze_workers", 1) ) self._udf_backup_manager = UDFBackup() self._nc_backup_manager = NamedCollectionsBackup() diff --git a/ch_backup/clickhouse/control.py b/ch_backup/clickhouse/control.py index 34a8b422..10714154 100644 --- a/ch_backup/clickhouse/control.py +++ b/ch_backup/clickhouse/control.py @@ -210,12 +210,9 @@ CREATE TABLE IF NOT EXISTS `{system_db}`._deduplication_info_current ( name String, checksum String, - database String, - table String, ) ENGINE = MergeTree() ORDER BY (name, checksum) - PARTITION BY (database, table) """ ) @@ -957,13 +954,12 @@ def create_deduplication_table(self): system_db=escape(self._backup_config["system_database"]) ) ) - for table_name in ["_deduplication_info", "_deduplication_info_current"]: - self._ch_client.query( - TRUNCATE_TABLE_IF_EXISTS_SQL.format( - db_name=escape(self._backup_config["system_database"]), - table_name=table_name, - ) + self._ch_client.query( + TRUNCATE_TABLE_IF_EXISTS_SQL.format( + db_name=escape(self._backup_config["system_database"]), + table_name="_deduplication_info", ) + ) self._ch_client.query( CREATE_IF_NOT_EXISTS_DEDUP_TABLE_SQL.format( system_db=escape(self._backup_config["system_database"]) @@ -989,16 +985,19 @@ def get_deduplication_info( """ Get deduplication info for given frozen parts of a table """ + self._ch_client.query( + TRUNCATE_TABLE_IF_EXISTS_SQL.format( + db_name=escape(self._backup_config["system_database"]), + table_name="_deduplication_info_current", + ) + ) self._ch_client.query( CREATE_IF_NOT_EXISTS_DEDUP_TABLE_CURRENT_SQL.format( system_db=escape(self._backup_config["system_database"]) ) ) - batch = [ - f"('{part.name}','{part.checksum}','{database}','{table}')" - for part in frozen_parts.values() - ] + batch = [f"('{part.name}','{part.checksum}')" for part in frozen_parts.values()] self._ch_client.query( INSERT_DEDUP_INFO_BATCH_SQL.format( system_db=escape(self._backup_config["system_database"]), diff --git a/ch_backup/config.py b/ch_backup/config.py index 4e4c3eaa..40484f67 100644 --- a/ch_backup/config.py +++ b/ch_backup/config.py @@ -156,8 +156,8 @@ def _as_seconds(t: str) -> int: "workers": 4, # The number of processes for parts restoring from S3 disks. "cloud_storage_restore_workers": 4, - # The number of threads for parallel freeze and backup of tables - "backup_threads": 4, + # The number of processes for parallel freeze of tables + "freeze_workers": 4, }, "pipeline": { # Is asynchronous pipelines used (based on Pypeln library) diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 4bd97578..0e6f07e2 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -4,12 +4,12 @@ import os from collections import deque -from concurrent.futures import Future, ThreadPoolExecutor, as_completed +from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from functools import partial from itertools import chain from pathlib import Path -from typing import Dict, Iterable, List, Optional, Sequence, Tuple +from typing import Callable, Dict, Iterable, List, Optional, Sequence, Tuple from ch_backup import logging from ch_backup.backup.deduplication import deduplicate_parts @@ -50,8 +50,8 @@ class TableBackup(BackupManager): Table backup class """ - def __init__(self, backup_threads: int = 1): - self._backup_threads = backup_threads + def __init__(self, freeze_workers: int = 1): + self._freeze_workers = freeze_workers def backup( self, @@ -125,10 +125,9 @@ def _backup( context.ch_ctl.create_shadow_increment() with ThreadPoolExecutor( - max_workers=self._backup_threads - ) as backup_executor: - backup_futures: List[Future] = [] - upload_observers: List[UploadPartObserver] = [] + max_workers=self._freeze_workers + ) as freeze_executor: + freeze_futures = [] for table in context.ch_ctl.get_tables(db.name, tables): if table.name not in mtimes: @@ -140,9 +139,9 @@ def _backup( table.name, ) - backup_futures.append( - backup_executor.submit( - self._backup_table, + freeze_futures.append( + freeze_executor.submit( + self._freeze_table, context, db, table, @@ -152,51 +151,15 @@ def _backup( ) ) - for backup_future in as_completed(backup_futures): - upload_observer = backup_future.result() - if upload_observer: - upload_observers.append(upload_observer) - - context.backup_layout.wait() - - logging.debug( - 'All tables from "{}" are processed, validating uploaded parts.', - db.name, - ) - for upload_observer in upload_observers: - self._validate_uploaded_parts( - context, upload_observer.uploaded_parts - ) + for freeze_future in as_completed(freeze_futures): + backup_freezed_table = freeze_future.result() + if backup_freezed_table is not None: + backup_freezed_table() context.ch_ctl.remove_freezed_data() context.backup_layout.upload_backup_metadata(context.backup_meta) - def _backup_table( - self, - context: BackupContext, - db: Database, - table: Table, - backup_name: str, - schema_only: bool, - mtimes: Dict[str, TableMetadataMtime], - ) -> Optional[UploadPartObserver]: - create_statement = self._load_create_statement_from_disk(table) - if not create_statement: - logging.warning( - 'Skipping table backup for "{}"."{}". Local metadata is empty or absent', - db.name, - table.name, - ) - return None - # Freeze only MergeTree tables - if not schema_only and is_merge_tree(table.engine): - if not self._freeze_table(context, table, backup_name): - return None - return self._backup_table_after_freeze( - context, db, table, backup_name, schema_only, mtimes, create_statement - ) - @staticmethod def _backup_cloud_storage_metadata(context: BackupContext) -> None: """ @@ -366,27 +329,50 @@ def _load_create_statement_from_disk(table: Table) -> Optional[bytes]: def _freeze_table( self, context: BackupContext, + db: Database, table: Table, backup_name: str, - ) -> bool: + schema_only: bool, + mtimes: Dict[str, TableMetadataMtime], + ) -> Optional[Callable]: """ - Freeze table. + Freeze table and return function which backups freezed table. """ logging.debug('Trying to freeze "{}"."{}"', table.database, table.name) - - try: - context.ch_ctl.freeze_table(backup_name, table) - except ClickhouseError: - if context.ch_ctl.does_table_exist(table.database, table.name): - raise - + create_statement = self._load_create_statement_from_disk(table) + if not create_statement: logging.warning( - 'Table "{}"."{}" was removed by a user during backup', - table.database, + 'Skipping table backup for "{}"."{}". Local metadata is empty or absent', + db.name, table.name, ) - return False - return True + return None + + # Freeze only MergeTree tables + if not schema_only and is_merge_tree(table.engine): + try: + context.ch_ctl.freeze_table(backup_name, table) + except ClickhouseError: + if context.ch_ctl.does_table_exist(table.database, table.name): + raise + + logging.warning( + 'Table "{}"."{}" was removed by a user during backup', + table.database, + table.name, + ) + return None + + return partial( + self._backup_table_after_freeze, + context, + db, + table, + backup_name, + schema_only, + mtimes, + create_statement, + ) def _backup_table_after_freeze( self, @@ -397,7 +383,7 @@ def _backup_table_after_freeze( schema_only: bool, mtimes: Dict[str, TableMetadataMtime], create_statement: bytes, - ) -> Optional[UploadPartObserver]: + ) -> None: # Check if table metadata was updated new_mtime = self._get_mtime(table.metadata_path) if new_mtime is None or mtimes[table.name].mtime != new_mtime: @@ -406,7 +392,8 @@ def _backup_table_after_freeze( table.database, table.name, ) - return None + context.ch_ctl.remove_freezed_data(backup_name, table) + return logging.debug( 'Performing table backup for "{}"."{}"', table.database, table.name @@ -421,15 +408,14 @@ def _backup_table_after_freeze( ) # Backup table data if not schema_only: - return self._backup_frozen_table_data(context, table, backup_name) - return None + self._backup_frozen_table_data(context, table, backup_name) def _backup_frozen_table_data( self, context: BackupContext, table: Table, backup_name: str, - ) -> Optional[UploadPartObserver]: + ) -> None: """ Backup table with data opposed to schema only. """ @@ -440,9 +426,7 @@ def deduplicate_parts_in_batch( frozen_parts: Dict[str, FrozenPart], ) -> None: logging.debug( - 'Working on deduplication of {} frozen parts of "{}"', - len(frozen_parts), - table.name, + "Working on deduplication of {} frozen parts", len(frozen_parts) ) deduplicated_parts = deduplicate_parts( context, table.database, table.name, frozen_parts @@ -474,7 +458,7 @@ def deduplicate_parts_in_batch( table.database, table.name, ) - return None + return logging.debug('Uploading table data for "{}"."{}"', table.database, table.name) @@ -499,7 +483,11 @@ def deduplicate_parts_in_batch( if frozen_parts_batch: deduplicate_parts_in_batch(context, upload_observer, frozen_parts_batch) - return upload_observer + context.backup_layout.wait() + + self._validate_uploaded_parts(context, upload_observer.uploaded_parts) + + context.ch_ctl.remove_freezed_data(backup_name, table) @staticmethod def _validate_uploaded_parts(context: BackupContext, uploaded_parts: list) -> None: diff --git a/tests/integration/ch_backup.featureset b/tests/integration/ch_backup.featureset index 19f299e1..ddcb56e8 100644 --- a/tests/integration/ch_backup.featureset +++ b/tests/integration/ch_backup.featureset @@ -12,7 +12,7 @@ features/cloud_storage.feature features/database_engines.feature features/ddl_dictionary.feature features/deduplication.feature -features/backup_parallel.feature +features/freeze_parallel.feature features/incremental_restore.feature features/metadata.feature features/min_interval.feature diff --git a/tests/integration/features/backup_parallel.feature b/tests/integration/features/freeze_parallel.feature similarity index 87% rename from tests/integration/features/backup_parallel.feature rename to tests/integration/features/freeze_parallel.feature index 98098333..7ab31028 100644 --- a/tests/integration/features/backup_parallel.feature +++ b/tests/integration/features/freeze_parallel.feature @@ -1,4 +1,4 @@ -Feature: Parallel freeze and backup +Feature: Parallel freeze Background: Given default configuration @@ -9,11 +9,11 @@ Feature: Parallel freeze and backup And clickhouse on clickhouse01 has test schema with 5 databases and 10 tables And clickhouse01 has test clickhouse data test1 with 5 databases, 10 tables, 100 rows and 5 partitions - Scenario: Create backup with single backup thread + Scenario: Create backup with single freeze worker Given ch-backup configuration on clickhouse01 """ multiprocessing: - backup_threads: 1 + freeze_workers: 1 """ When we create clickhouse01 clickhouse backup Then we got the following backups on clickhouse01 @@ -22,7 +22,7 @@ Feature: Parallel freeze and backup When we restore clickhouse backup #0 to clickhouse02 Then we got same clickhouse data at clickhouse01 clickhouse02 - Scenario: Create backup with default number of backup threads + Scenario: Create backup with default number of freeze workers When we create clickhouse01 clickhouse backup Then we got the following backups on clickhouse01 | num | state | data_count | link_count | title | diff --git a/tests/unit/test_backup_tables.py b/tests/unit/test_backup_tables.py index ce1dfd60..2b32cb2b 100644 --- a/tests/unit/test_backup_tables.py +++ b/tests/unit/test_backup_tables.py @@ -29,7 +29,7 @@ def test_backup_table_skipping_if_metadata_updated_during_backup( context = BackupContext(DEFAULT_CONFIG) # type: ignore[arg-type] db = Database(db_name, "MergeTree", "/var/lib/clickhouse/metadata/db1.sql") multiprocessing_conf: Dict[str, int] = DEFAULT_CONFIG.get("multiprocessing") # type: ignore[assignment] - table_backup = TableBackup(multiprocessing_conf.get("backup_threads", 1)) + table_backup = TableBackup(multiprocessing_conf.get("freeze_workers", 1)) backup_meta = BackupMetadata( name="20181017T210300", path="ch_backup/20181017T210300", From 93da739a2e8709c465e094f04cd21e03f22cf490 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Mon, 8 Jul 2024 19:12:58 +0300 Subject: [PATCH 17/24] Backup tables after all tables are frozen --- ch_backup/ch_backup.py | 2 +- ch_backup/config.py | 4 ++-- ch_backup/logic/table.py | 16 +++++++++++----- .../integration/features/freeze_parallel.feature | 2 +- tests/unit/test_backup_tables.py | 2 +- 5 files changed, 16 insertions(+), 10 deletions(-) diff --git a/ch_backup/ch_backup.py b/ch_backup/ch_backup.py index f2f514fd..7175aec7 100644 --- a/ch_backup/ch_backup.py +++ b/ch_backup/ch_backup.py @@ -49,7 +49,7 @@ def __init__(self, config: Config) -> None: self._access_backup_manager = AccessBackup() self._database_backup_manager = DatabaseBackup() self._table_backup_manager = TableBackup( - self._config.get("multiprocessing").get("freeze_workers", 1) + self._config.get("multiprocessing").get("freeze_threads", 1) ) self._udf_backup_manager = UDFBackup() self._nc_backup_manager = NamedCollectionsBackup() diff --git a/ch_backup/config.py b/ch_backup/config.py index 40484f67..1658096f 100644 --- a/ch_backup/config.py +++ b/ch_backup/config.py @@ -156,8 +156,8 @@ def _as_seconds(t: str) -> int: "workers": 4, # The number of processes for parts restoring from S3 disks. "cloud_storage_restore_workers": 4, - # The number of processes for parallel freeze of tables - "freeze_workers": 4, + # The number of threads for parallel freeze of tables + "freeze_threads": 4, }, "pipeline": { # Is asynchronous pipelines used (based on Pypeln library) diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 0e6f07e2..f6536099 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -50,8 +50,8 @@ class TableBackup(BackupManager): Table backup class """ - def __init__(self, freeze_workers: int = 1): - self._freeze_workers = freeze_workers + def __init__(self, freeze_threads: int = 1): + self._freeze_threads = freeze_threads def backup( self, @@ -125,16 +125,17 @@ def _backup( context.ch_ctl.create_shadow_increment() with ThreadPoolExecutor( - max_workers=self._freeze_workers + max_workers=self._freeze_threads ) as freeze_executor: freeze_futures = [] + backup_functions = [] for table in context.ch_ctl.get_tables(db.name, tables): if table.name not in mtimes: continue logging.debug( - 'Adding "{}"."{}" to the freeze and backup queue', + 'Adding "{}"."{}" to the freeze queue', table.database, table.name, ) @@ -154,7 +155,12 @@ def _backup( for freeze_future in as_completed(freeze_futures): backup_freezed_table = freeze_future.result() if backup_freezed_table is not None: - backup_freezed_table() + backup_functions.append(backup_freezed_table) + + logging.debug("All tables from {} are frozen", db.name) + + for backup_freezed_table in backup_functions: + backup_freezed_table() context.ch_ctl.remove_freezed_data() diff --git a/tests/integration/features/freeze_parallel.feature b/tests/integration/features/freeze_parallel.feature index 7ab31028..4505964e 100644 --- a/tests/integration/features/freeze_parallel.feature +++ b/tests/integration/features/freeze_parallel.feature @@ -13,7 +13,7 @@ Feature: Parallel freeze Given ch-backup configuration on clickhouse01 """ multiprocessing: - freeze_workers: 1 + freeze_threads: 1 """ When we create clickhouse01 clickhouse backup Then we got the following backups on clickhouse01 diff --git a/tests/unit/test_backup_tables.py b/tests/unit/test_backup_tables.py index 2b32cb2b..f3f77d49 100644 --- a/tests/unit/test_backup_tables.py +++ b/tests/unit/test_backup_tables.py @@ -29,7 +29,7 @@ def test_backup_table_skipping_if_metadata_updated_during_backup( context = BackupContext(DEFAULT_CONFIG) # type: ignore[arg-type] db = Database(db_name, "MergeTree", "/var/lib/clickhouse/metadata/db1.sql") multiprocessing_conf: Dict[str, int] = DEFAULT_CONFIG.get("multiprocessing") # type: ignore[assignment] - table_backup = TableBackup(multiprocessing_conf.get("freeze_workers", 1)) + table_backup = TableBackup(multiprocessing_conf.get("freeze_threads", 1)) backup_meta = BackupMetadata( name="20181017T210300", path="ch_backup/20181017T210300", From b7a43582f970efb0023a4772e07ee949c58a5495 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Mon, 8 Jul 2024 20:02:13 +0300 Subject: [PATCH 18/24] Move thread pool related logic to separate class --- ch_backup/logic/table.py | 122 +++++++++++++++++++++++++-------------- 1 file changed, 79 insertions(+), 43 deletions(-) diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index f6536099..303e994f 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -4,12 +4,12 @@ import os from collections import deque -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import Future, ThreadPoolExecutor, as_completed from dataclasses import dataclass from functools import partial from itertools import chain from pathlib import Path -from typing import Callable, Dict, Iterable, List, Optional, Sequence, Tuple +from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple from ch_backup import logging from ch_backup.backup.deduplication import deduplicate_parts @@ -35,6 +35,50 @@ from ch_backup.util import compare_schema, get_table_zookeeper_paths +class TableFreezePool: + """ + Class to freeze tables in parallel. + """ + + def __init__(self, threads: int): + self._futures: List[Future] = [] + self._thread_pool: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=threads) + + def __enter__(self): + return self + + def __exit__(self, *exc: Any) -> bool: + self.shutdown() + return True + + def submit(self, fn: Callable, *args: Any, **kwargs: Any) -> None: + """ + Submit freeze task to the executor. + Task is expected to return callable for table backup. + """ + result = self._thread_pool.submit(fn, *args, **kwargs) + self._futures.append(result) + + def wait(self) -> List[Callable]: + """ + Wait for all submitted tables to freeze and return callables to backup frozen tables. + """ + result = [] + for future in as_completed(self._futures): + backup_freezed_table = future.result() + if backup_freezed_table is not None: + result.append(backup_freezed_table) + self._futures.clear() + return result + + def shutdown(self): + """ + Clean-up the resources associated with the thread pool. + """ + self._thread_pool.shutdown(wait=False) + self._futures.clear() + + @dataclass class TableMetadataMtime: """ @@ -66,14 +110,16 @@ def backup( backup_name = context.backup_meta.get_sanitized_name() - for db in databases: - self._backup( - context, - db, - db_tables[db.name], - backup_name, - schema_only, - ) + with TableFreezePool(self._freeze_threads) as freeze_pool: + for db in databases: + self._backup( + freeze_pool, + context, + db, + db_tables[db.name], + backup_name, + schema_only, + ) self._backup_cloud_storage_metadata(context) @@ -104,6 +150,7 @@ def _collect_local_metadata_mtime( def _backup( self, + freeze_pool: TableFreezePool, context: BackupContext, db: Database, tables: Sequence[str], @@ -124,45 +171,34 @@ def _backup( # race condition with parallel freeze context.ch_ctl.create_shadow_increment() - with ThreadPoolExecutor( - max_workers=self._freeze_threads - ) as freeze_executor: - freeze_futures = [] - backup_functions = [] - - for table in context.ch_ctl.get_tables(db.name, tables): - if table.name not in mtimes: - continue + for table in context.ch_ctl.get_tables(db.name, tables): + if table.name not in mtimes: + continue - logging.debug( - 'Adding "{}"."{}" to the freeze queue', - table.database, - table.name, - ) + logging.debug( + 'Adding "{}"."{}" to the freeze queue', + table.database, + table.name, + ) - freeze_futures.append( - freeze_executor.submit( - self._freeze_table, - context, - db, - table, - backup_name, - schema_only, - mtimes, - ) - ) + freeze_pool.submit( + self._freeze_table, + context, + db, + table, + backup_name, + schema_only, + mtimes, + ) - for freeze_future in as_completed(freeze_futures): - backup_freezed_table = freeze_future.result() - if backup_freezed_table is not None: - backup_functions.append(backup_freezed_table) + backup_functions = freeze_pool.wait() - logging.debug("All tables from {} are frozen", db.name) + logging.debug("All tables from {} are frozen", db.name) - for backup_freezed_table in backup_functions: - backup_freezed_table() + for backup_freezed_table in backup_functions: + backup_freezed_table() - context.ch_ctl.remove_freezed_data() + context.ch_ctl.remove_freezed_data() context.backup_layout.upload_backup_metadata(context.backup_meta) From 6d1076a4a4b1b749d5bcf8bf971aaf30ff677afa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Mon, 8 Jul 2024 20:43:48 +0300 Subject: [PATCH 19/24] Move all freeze related logic to TableFreezer --- ch_backup/logic/table.py | 203 ++++++++++++++++++++------------------- 1 file changed, 104 insertions(+), 99 deletions(-) diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 303e994f..06cce754 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -9,7 +9,7 @@ from functools import partial from itertools import chain from pathlib import Path -from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple +from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple from ch_backup import logging from ch_backup.backup.deduplication import deduplicate_parts @@ -35,7 +35,7 @@ from ch_backup.util import compare_schema, get_table_zookeeper_paths -class TableFreezePool: +class TableFreezer: """ Class to freeze tables in parallel. """ @@ -48,20 +48,13 @@ def __enter__(self): return self def __exit__(self, *exc: Any) -> bool: - self.shutdown() + self._thread_pool.shutdown(wait=False) + self._futures.clear() return True - def submit(self, fn: Callable, *args: Any, **kwargs: Any) -> None: - """ - Submit freeze task to the executor. - Task is expected to return callable for table backup. - """ - result = self._thread_pool.submit(fn, *args, **kwargs) - self._futures.append(result) - - def wait(self) -> List[Callable]: + def wait(self) -> List[Tuple[Table, bytes]]: """ - Wait for all submitted tables to freeze and return callables to backup frozen tables. + Wait for all submitted tables to freeze and return create statements. """ result = [] for future in as_completed(self._futures): @@ -71,12 +64,88 @@ def wait(self) -> List[Callable]: self._futures.clear() return result - def shutdown(self): + def freeze_table( + self, + context: BackupContext, + db: Database, + table: Table, + backup_name: str, + schema_only: bool, + ) -> None: """ - Clean-up the resources associated with the thread pool. + Submit table to the freeze pool. """ - self._thread_pool.shutdown(wait=False) - self._futures.clear() + future = self._thread_pool.submit( + self._freeze_table, + context, + db, + table, + backup_name, + schema_only, + ) + self._futures.append(future) + + def _freeze_table( + self, + context: BackupContext, + db: Database, + table: Table, + backup_name: str, + schema_only: bool, + ) -> Optional[Tuple[Table, bytes]]: + """ + Freeze table and return it's create statement + """ + logging.debug('Trying to freeze "{}"."{}"', table.database, table.name) + + create_statement = self._load_create_statement_from_disk(table) + if not create_statement: + logging.warning( + 'Skipping table backup for "{}"."{}". Local metadata is empty or absent', + db.name, + table.name, + ) + return None + + # Freeze only MergeTree tables + if not schema_only and is_merge_tree(table.engine): + try: + context.ch_ctl.freeze_table(backup_name, table) + except ClickhouseError: + if context.ch_ctl.does_table_exist(table.database, table.name): + raise + + logging.warning( + 'Table "{}"."{}" was removed by a user during backup', + table.database, + table.name, + ) + return None + + return (table, create_statement) + + @staticmethod + def _load_create_statement_from_disk(table: Table) -> Optional[bytes]: + """ + Load a create statement of the table from a metadata file on the disk. + """ + if not table.metadata_path: + logging.debug( + 'Cannot load a create statement of the table "{}"."{}". Metadata is empty', + table.database, + table.name, + ) + return None + try: + return Path(table.metadata_path).read_bytes() + except OSError as e: + logging.debug( + 'Cannot load a create statement of the table "{}"."{}": {}', + table.database, + table.name, + str(e), + ) + return None @dataclass @@ -110,10 +179,10 @@ def backup( backup_name = context.backup_meta.get_sanitized_name() - with TableFreezePool(self._freeze_threads) as freeze_pool: + with TableFreezer(self._freeze_threads) as table_freezer: for db in databases: self._backup( - freeze_pool, + table_freezer, context, db, db_tables[db.name], @@ -150,7 +219,7 @@ def _collect_local_metadata_mtime( def _backup( self, - freeze_pool: TableFreezePool, + table_freezer: TableFreezer, context: BackupContext, db: Database, tables: Sequence[str], @@ -181,22 +250,29 @@ def _backup( table.name, ) - freeze_pool.submit( - self._freeze_table, + table_freezer.freeze_table( context, db, table, backup_name, schema_only, - mtimes, ) - backup_functions = freeze_pool.wait() + # Wait until all tables are freezed + freezed_tables = table_freezer.wait() - logging.debug("All tables from {} are frozen", db.name) + logging.debug('All tables from "{}" are frozen', db.name) - for backup_freezed_table in backup_functions: - backup_freezed_table() + for table, create_statement in freezed_tables: + self._backup_freezed_table( + context, + db, + table, + backup_name, + schema_only, + mtimes, + create_statement, + ) context.ch_ctl.remove_freezed_data() @@ -345,78 +421,7 @@ def restore( keep_going=keep_going, ) - @staticmethod - def _load_create_statement_from_disk(table: Table) -> Optional[bytes]: - """ - Load a create statement of the table from a metadata file on the disk. - """ - if not table.metadata_path: - logging.debug( - 'Cannot load a create statement of the table "{}"."{}". Metadata is empty', - table.database, - table.name, - ) - return None - try: - return Path(table.metadata_path).read_bytes() - except OSError as e: - logging.debug( - 'Cannot load a create statement of the table "{}"."{}": {}', - table.database, - table.name, - str(e), - ) - return None - - def _freeze_table( - self, - context: BackupContext, - db: Database, - table: Table, - backup_name: str, - schema_only: bool, - mtimes: Dict[str, TableMetadataMtime], - ) -> Optional[Callable]: - """ - Freeze table and return function which backups freezed table. - """ - logging.debug('Trying to freeze "{}"."{}"', table.database, table.name) - create_statement = self._load_create_statement_from_disk(table) - if not create_statement: - logging.warning( - 'Skipping table backup for "{}"."{}". Local metadata is empty or absent', - db.name, - table.name, - ) - return None - - # Freeze only MergeTree tables - if not schema_only and is_merge_tree(table.engine): - try: - context.ch_ctl.freeze_table(backup_name, table) - except ClickhouseError: - if context.ch_ctl.does_table_exist(table.database, table.name): - raise - - logging.warning( - 'Table "{}"."{}" was removed by a user during backup', - table.database, - table.name, - ) - return None - - return partial( - self._backup_table_after_freeze, - context, - db, - table, - backup_name, - schema_only, - mtimes, - create_statement, - ) - - def _backup_table_after_freeze( + def _backup_freezed_table( self, context: BackupContext, db: Database, From c70ad1fc0405b775e04b075b1fed00331cd2ecd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Mon, 8 Jul 2024 20:52:17 +0300 Subject: [PATCH 20/24] Small fixes --- ch_backup/ch_backup.py | 2 +- ch_backup/logic/table.py | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/ch_backup/ch_backup.py b/ch_backup/ch_backup.py index 7175aec7..8c4db4ac 100644 --- a/ch_backup/ch_backup.py +++ b/ch_backup/ch_backup.py @@ -49,7 +49,7 @@ def __init__(self, config: Config) -> None: self._access_backup_manager = AccessBackup() self._database_backup_manager = DatabaseBackup() self._table_backup_manager = TableBackup( - self._config.get("multiprocessing").get("freeze_threads", 1) + self._config["multiprocessing"]["freeze_threads"] ) self._udf_backup_manager = UDFBackup() self._nc_backup_manager = NamedCollectionsBackup() diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 06cce754..6f509fa4 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -75,6 +75,9 @@ def freeze_table( """ Submit table to the freeze pool. """ + # Create shadow/increment.txt if not exists manually to avoid + # race condition with parallel freeze + context.ch_ctl.create_shadow_increment() future = self._thread_pool.submit( self._freeze_table, context, @@ -236,10 +239,6 @@ def _backup( # See https://en.wikipedia.org/wiki/Optimistic_concurrency_control mtimes = self._collect_local_metadata_mtime(context, db, tables) - # Create shadow/increment.txt if not exists manually to avoid - # race condition with parallel freeze - context.ch_ctl.create_shadow_increment() - for table in context.ch_ctl.get_tables(db.name, tables): if table.name not in mtimes: continue From 190539cd37f35ec88383116dc69612927d5b3334 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Tue, 9 Jul 2024 02:42:49 +0300 Subject: [PATCH 21/24] Fix exception suppression in TableFreezer's __exit__ --- ch_backup/logic/table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 6f509fa4..e0cdd36f 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -50,7 +50,7 @@ def __enter__(self): def __exit__(self, *exc: Any) -> bool: self._thread_pool.shutdown(wait=False) self._futures.clear() - return True + return False def wait(self) -> List[Tuple[Table, bytes]]: """ From 59fdf82df7a1745fa3886a92331fcdfb0298e638 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Tue, 9 Jul 2024 03:01:03 +0300 Subject: [PATCH 22/24] Fix mypy --- ch_backup/logic/table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index e0cdd36f..87db7f41 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -47,7 +47,7 @@ def __init__(self, threads: int): def __enter__(self): return self - def __exit__(self, *exc: Any) -> bool: + def __exit__(self, *exc: Any) -> bool: # type: ignore self._thread_pool.shutdown(wait=False) self._futures.clear() return False From 2f12cb7909a5e837273c6cbdd02b6f026b446042 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Tue, 9 Jul 2024 13:07:59 +0300 Subject: [PATCH 23/24] Fix black --- ch_backup/logic/table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 87db7f41..c985e5d8 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -47,7 +47,7 @@ def __init__(self, threads: int): def __enter__(self): return self - def __exit__(self, *exc: Any) -> bool: # type: ignore + def __exit__(self, *exc: Any) -> bool: # type: ignore self._thread_pool.shutdown(wait=False) self._futures.clear() return False From 87f9d305af8c9d3ac0f3e947842cf67432a695fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Tue, 9 Jul 2024 17:59:17 +0300 Subject: [PATCH 24/24] Move freeze logic to the method --- ch_backup/ch_backup.py | 7 +- ch_backup/clickhouse/control.py | 6 + ch_backup/logic/table.py | 265 ++++++++++++++----------------- tests/unit/test_backup_tables.py | 15 +- 4 files changed, 137 insertions(+), 156 deletions(-) diff --git a/ch_backup/ch_backup.py b/ch_backup/ch_backup.py index 8c4db4ac..53825e85 100644 --- a/ch_backup/ch_backup.py +++ b/ch_backup/ch_backup.py @@ -48,9 +48,7 @@ def __init__(self, config: Config) -> None: self._config = config self._access_backup_manager = AccessBackup() self._database_backup_manager = DatabaseBackup() - self._table_backup_manager = TableBackup( - self._config["multiprocessing"]["freeze_threads"] - ) + self._table_backup_manager = TableBackup() self._udf_backup_manager = UDFBackup() self._nc_backup_manager = NamedCollectionsBackup() @@ -194,6 +192,9 @@ def backup( databases, db_tables, schema_only=sources.schema_only, + freeze_threads=self._config["multiprocessing"][ + "freeze_threads" + ], ) self._context.backup_meta.state = BackupState.CREATED diff --git a/ch_backup/clickhouse/control.py b/ch_backup/clickhouse/control.py index 10714154..70ff489e 100644 --- a/ch_backup/clickhouse/control.py +++ b/ch_backup/clickhouse/control.py @@ -481,6 +481,11 @@ def remove_freezed_data( """ Remove all freezed partitions from all local disks. """ + if not (backup_name is None) == (table is None): + raise RuntimeError( + "Both backup_name and table should be None or not None at the same time" + ) + if backup_name and table: for table_data_path, disk in table.paths_with_disks: if disk.type == "local": @@ -793,6 +798,7 @@ def create_shadow_increment(self) -> None: """ Create shadow/increment.txt to fix race condition with parallel freeze. Must be used before freezing more than one table at once. + https://github.com/ClickHouse/ClickHouse/blob/597a72fd9afd88984abc10b284624c6b4d08368b/src/Common/Increment.h#L20 """ default_shadow_path = Path(self._root_data_path) / "shadow" increment_path = default_shadow_path / "increment.txt" diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index c985e5d8..0e8c65ad 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -9,7 +9,7 @@ from functools import partial from itertools import chain from pathlib import Path -from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple +from typing import Dict, Iterable, List, Optional, Sequence, Tuple from ch_backup import logging from ch_backup.backup.deduplication import deduplicate_parts @@ -35,122 +35,6 @@ from ch_backup.util import compare_schema, get_table_zookeeper_paths -class TableFreezer: - """ - Class to freeze tables in parallel. - """ - - def __init__(self, threads: int): - self._futures: List[Future] = [] - self._thread_pool: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=threads) - - def __enter__(self): - return self - - def __exit__(self, *exc: Any) -> bool: # type: ignore - self._thread_pool.shutdown(wait=False) - self._futures.clear() - return False - - def wait(self) -> List[Tuple[Table, bytes]]: - """ - Wait for all submitted tables to freeze and return create statements. - """ - result = [] - for future in as_completed(self._futures): - backup_freezed_table = future.result() - if backup_freezed_table is not None: - result.append(backup_freezed_table) - self._futures.clear() - return result - - def freeze_table( - self, - context: BackupContext, - db: Database, - table: Table, - backup_name: str, - schema_only: bool, - ) -> None: - """ - Submit table to the freeze pool. - """ - # Create shadow/increment.txt if not exists manually to avoid - # race condition with parallel freeze - context.ch_ctl.create_shadow_increment() - future = self._thread_pool.submit( - self._freeze_table, - context, - db, - table, - backup_name, - schema_only, - ) - self._futures.append(future) - - def _freeze_table( - self, - context: BackupContext, - db: Database, - table: Table, - backup_name: str, - schema_only: bool, - ) -> Optional[Tuple[Table, bytes]]: - """ - Freeze table and return it's create statement - """ - logging.debug('Trying to freeze "{}"."{}"', table.database, table.name) - - create_statement = self._load_create_statement_from_disk(table) - if not create_statement: - logging.warning( - 'Skipping table backup for "{}"."{}". Local metadata is empty or absent', - db.name, - table.name, - ) - return None - - # Freeze only MergeTree tables - if not schema_only and is_merge_tree(table.engine): - try: - context.ch_ctl.freeze_table(backup_name, table) - except ClickhouseError: - if context.ch_ctl.does_table_exist(table.database, table.name): - raise - - logging.warning( - 'Table "{}"."{}" was removed by a user during backup', - table.database, - table.name, - ) - return None - - return (table, create_statement) - - @staticmethod - def _load_create_statement_from_disk(table: Table) -> Optional[bytes]: - """ - Load a create statement of the table from a metadata file on the disk. - """ - if not table.metadata_path: - logging.debug( - 'Cannot load a create statement of the table "{}"."{}". Metadata is empty', - table.database, - table.name, - ) - return None - try: - return Path(table.metadata_path).read_bytes() - except OSError as e: - logging.debug( - 'Cannot load a create statement of the table "{}"."{}": {}', - table.database, - table.name, - str(e), - ) - return None - - @dataclass class TableMetadataMtime: """ @@ -166,15 +50,13 @@ class TableBackup(BackupManager): Table backup class """ - def __init__(self, freeze_threads: int = 1): - self._freeze_threads = freeze_threads - def backup( self, context: BackupContext, databases: Sequence[Database], db_tables: Dict[str, list], schema_only: bool, + freeze_threads: int, ) -> None: """ Backup tables metadata, MergeTree data and Cloud storage metadata. @@ -182,16 +64,15 @@ def backup( backup_name = context.backup_meta.get_sanitized_name() - with TableFreezer(self._freeze_threads) as table_freezer: - for db in databases: - self._backup( - table_freezer, - context, - db, - db_tables[db.name], - backup_name, - schema_only, - ) + for db in databases: + self._backup( + context, + db, + db_tables[db.name], + backup_name, + schema_only, + freeze_threads, + ) self._backup_cloud_storage_metadata(context) @@ -222,12 +103,12 @@ def _collect_local_metadata_mtime( def _backup( self, - table_freezer: TableFreezer, context: BackupContext, db: Database, tables: Sequence[str], backup_name: str, schema_only: bool, + freeze_threads: int, ) -> None: """ Backup single database tables. @@ -238,44 +119,130 @@ def _backup( # To ensure consistency between metadata and data backups. # See https://en.wikipedia.org/wiki/Optimistic_concurrency_control mtimes = self._collect_local_metadata_mtime(context, db, tables) + tables_ = list( + filter( + lambda table: table.name in mtimes, + context.ch_ctl.get_tables(db.name, tables), + ) + ) - for table in context.ch_ctl.get_tables(db.name, tables): - if table.name not in mtimes: - continue + freezed_tables = self._freeze_tables( + context, db, tables_, backup_name, schema_only, freeze_threads + ) - logging.debug( - 'Adding "{}"."{}" to the freeze queue', - table.database, - table.name, - ) + logging.debug('All tables from "{}" are frozen', db.name) - table_freezer.freeze_table( + for table, create_statement in freezed_tables: + self._backup_freezed_table( context, db, table, backup_name, schema_only, + mtimes, + create_statement, ) - # Wait until all tables are freezed - freezed_tables = table_freezer.wait() + context.ch_ctl.remove_freezed_data() - logging.debug('All tables from "{}" are frozen', db.name) + context.backup_layout.upload_backup_metadata(context.backup_meta) - for table, create_statement in freezed_tables: - self._backup_freezed_table( + @staticmethod + def _freeze_tables( + context: BackupContext, + db: Database, + tables: Sequence[Table], + backup_name: str, + schema_only: bool, + threads: int, + ) -> List[Tuple[Table, bytes]]: + """ + Freeze tables in parallel. + """ + # Create shadow/increment.txt if not exists manually to avoid + # race condition with parallel freeze + context.ch_ctl.create_shadow_increment() + futures: List[Future] = [] + with ThreadPoolExecutor(max_workers=threads) as pool: + for table in tables: + future = pool.submit( + TableBackup._freeze_table, context, db, table, backup_name, schema_only, - mtimes, - create_statement, ) + futures.append(future) - context.ch_ctl.remove_freezed_data() + result: List[Tuple[Table, bytes]] = [] + for future in as_completed(futures): + table_and_statement = future.result() + if table_and_statement is not None: + result.append(table_and_statement) + return result - context.backup_layout.upload_backup_metadata(context.backup_meta) + @staticmethod + def _freeze_table( + context: BackupContext, + db: Database, + table: Table, + backup_name: str, + schema_only: bool, + ) -> Optional[Tuple[Table, bytes]]: + """ + Freeze table and return it's create statement + """ + logging.debug('Trying to freeze "{}"."{}"', table.database, table.name) + + create_statement = TableBackup._load_create_statement_from_disk(table) + if not create_statement: + logging.warning( + 'Skipping table backup for "{}"."{}". Local metadata is empty or absent', + db.name, + table.name, + ) + return None + + # Freeze only MergeTree tables + if not schema_only and is_merge_tree(table.engine): + try: + context.ch_ctl.freeze_table(backup_name, table) + except ClickhouseError: + if context.ch_ctl.does_table_exist(table.database, table.name): + raise + + logging.warning( + 'Table "{}"."{}" was removed by a user during backup', + table.database, + table.name, + ) + return None + + return (table, create_statement) + + @staticmethod + def _load_create_statement_from_disk(table: Table) -> Optional[bytes]: + """ + Load a create statement of the table from a metadata file on the disk. + """ + if not table.metadata_path: + logging.debug( + 'Cannot load a create statement of the table "{}"."{}". Metadata is empty', + table.database, + table.name, + ) + return None + try: + return Path(table.metadata_path).read_bytes() + except OSError as e: + logging.debug( + 'Cannot load a create statement of the table "{}"."{}": {}', + table.database, + table.name, + str(e), + ) + return None @staticmethod def _backup_cloud_storage_metadata(context: BackupContext) -> None: diff --git a/tests/unit/test_backup_tables.py b/tests/unit/test_backup_tables.py index f3f77d49..ca4c641f 100644 --- a/tests/unit/test_backup_tables.py +++ b/tests/unit/test_backup_tables.py @@ -1,4 +1,4 @@ -from typing import Dict, List +from typing import List from unittest.mock import Mock, patch import pytest @@ -28,8 +28,7 @@ def test_backup_table_skipping_if_metadata_updated_during_backup( # Prepare involved data objects context = BackupContext(DEFAULT_CONFIG) # type: ignore[arg-type] db = Database(db_name, "MergeTree", "/var/lib/clickhouse/metadata/db1.sql") - multiprocessing_conf: Dict[str, int] = DEFAULT_CONFIG.get("multiprocessing") # type: ignore[assignment] - table_backup = TableBackup(multiprocessing_conf.get("freeze_threads", 1)) + table_backup = TableBackup() backup_meta = BackupMetadata( name="20181017T210300", path="ch_backup/20181017T210300", @@ -67,7 +66,15 @@ def test_backup_table_skipping_if_metadata_updated_during_backup( with patch("os.path.getmtime", side_effect=mtime), patch( "ch_backup.logic.table.Path", read_bytes=read_bytes_mock ): - table_backup.backup(context, [db], {db_name: [table_name]}, schema_only=False) + table_backup.backup( + context, + [db], + {db_name: [table_name]}, + schema_only=False, + freeze_threads=DEFAULT_CONFIG["multiprocessing"][ + "freeze_threads" + ], # type: ignore + ) assert len(context.backup_meta.get_tables(db_name)) == backups_expected # One call after each table and one after database is backuped