diff --git a/ch_backup/ch_backup.py b/ch_backup/ch_backup.py index bec5b378..53825e85 100644 --- a/ch_backup/ch_backup.py +++ b/ch_backup/ch_backup.py @@ -192,6 +192,9 @@ def backup( databases, db_tables, schema_only=sources.schema_only, + freeze_threads=self._config["multiprocessing"][ + "freeze_threads" + ], ) self._context.backup_meta.state = BackupState.CREATED diff --git a/ch_backup/clickhouse/control.py b/ch_backup/clickhouse/control.py index 209f7d05..70ff489e 100644 --- a/ch_backup/clickhouse/control.py +++ b/ch_backup/clickhouse/control.py @@ -24,6 +24,7 @@ from ch_backup.exceptions import ClickhouseBackupError from ch_backup.util import ( chown_dir_contents, + chown_file, escape, list_dir_files, retry, @@ -474,15 +475,32 @@ def system_unfreeze(self, backup_name: str) -> None: query_sql = SYSTEM_UNFREEZE_SQL.format(backup_name=backup_name) self._ch_client.query(query_sql, timeout=self._unfreeze_timeout) - def remove_freezed_data(self) -> None: + def remove_freezed_data( + self, backup_name: Optional[str] = None, table: Optional[Table] = None + ) -> None: """ Remove all freezed partitions from all local disks. """ - for disk in self._disks.values(): - if disk.type == "local": - shadow_path = os.path.join(disk.path, "shadow") - logging.debug("Removing shadow data: {}", shadow_path) - self._remove_shadow_data(shadow_path) + if not (backup_name is None) == (table is None): + raise RuntimeError( + "Both backup_name and table should be None or not None at the same time" + ) + + if backup_name and table: + for table_data_path, disk in table.paths_with_disks: + if disk.type == "local": + table_relative_path = os.path.relpath(table_data_path, disk.path) + shadow_path = os.path.join( + disk.path, "shadow", backup_name, table_relative_path + ) + logging.debug("Removing shadow data: {}", shadow_path) + self._remove_shadow_data(shadow_path) + else: + for disk in self._disks.values(): + if disk.type == "local": + shadow_path = os.path.join(disk.path, "shadow") + logging.debug("Removing shadow data: {}", shadow_path) + self._remove_shadow_data(shadow_path) def remove_freezed_part(self, part: FrozenPart) -> None: """ @@ -776,6 +794,27 @@ def chown_dir(self, dir_path: str) -> None: need_recursion, ) + def create_shadow_increment(self) -> None: + """ + Create shadow/increment.txt to fix race condition with parallel freeze. + Must be used before freezing more than one table at once. + https://github.com/ClickHouse/ClickHouse/blob/597a72fd9afd88984abc10b284624c6b4d08368b/src/Common/Increment.h#L20 + """ + default_shadow_path = Path(self._root_data_path) / "shadow" + increment_path = default_shadow_path / "increment.txt" + if os.path.exists(increment_path): + return + if not os.path.exists(default_shadow_path): + os.mkdir(default_shadow_path) + self.chown_dir(str(default_shadow_path)) + with open(increment_path, "w", encoding="utf-8") as file: + file.write("0") + chown_file( + self._ch_ctl_config["user"], + self._ch_ctl_config["group"], + str(increment_path), + ) + @retry(OSError) def _remove_shadow_data(self, path: str) -> None: if path.find("/shadow") == -1: diff --git a/ch_backup/config.py b/ch_backup/config.py index 036d86b5..1658096f 100644 --- a/ch_backup/config.py +++ b/ch_backup/config.py @@ -156,6 +156,8 @@ def _as_seconds(t: str) -> int: "workers": 4, # The number of processes for parts restoring from S3 disks. "cloud_storage_restore_workers": 4, + # The number of threads for parallel freeze of tables + "freeze_threads": 4, }, "pipeline": { # Is asynchronous pipelines used (based on Pypeln library) diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index d752bdc3..0e8c65ad 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -4,6 +4,7 @@ import os from collections import deque +from concurrent.futures import Future, ThreadPoolExecutor, as_completed from dataclasses import dataclass from functools import partial from itertools import chain @@ -55,10 +56,12 @@ def backup( databases: Sequence[Database], db_tables: Dict[str, list], schema_only: bool, + freeze_threads: int, ) -> None: """ Backup tables metadata, MergeTree data and Cloud storage metadata. """ + backup_name = context.backup_meta.get_sanitized_name() for db in databases: @@ -68,7 +71,9 @@ def backup( db_tables[db.name], backup_name, schema_only, + freeze_threads, ) + self._backup_cloud_storage_metadata(context) def _collect_local_metadata_mtime( @@ -103,6 +108,7 @@ def _backup( tables: Sequence[str], backup_name: str, schema_only: bool, + freeze_threads: int, ) -> None: """ Backup single database tables. @@ -113,22 +119,131 @@ def _backup( # To ensure consistency between metadata and data backups. # See https://en.wikipedia.org/wiki/Optimistic_concurrency_control mtimes = self._collect_local_metadata_mtime(context, db, tables) + tables_ = list( + filter( + lambda table: table.name in mtimes, + context.ch_ctl.get_tables(db.name, tables), + ) + ) - for table in context.ch_ctl.get_tables(db.name, tables): - if table.name not in mtimes: - continue + freezed_tables = self._freeze_tables( + context, db, tables_, backup_name, schema_only, freeze_threads + ) - self._backup_table( + logging.debug('All tables from "{}" are frozen', db.name) + + for table, create_statement in freezed_tables: + self._backup_freezed_table( context, db, table, backup_name, schema_only, mtimes, + create_statement, ) + context.ch_ctl.remove_freezed_data() + context.backup_layout.upload_backup_metadata(context.backup_meta) + @staticmethod + def _freeze_tables( + context: BackupContext, + db: Database, + tables: Sequence[Table], + backup_name: str, + schema_only: bool, + threads: int, + ) -> List[Tuple[Table, bytes]]: + """ + Freeze tables in parallel. + """ + # Create shadow/increment.txt if not exists manually to avoid + # race condition with parallel freeze + context.ch_ctl.create_shadow_increment() + futures: List[Future] = [] + with ThreadPoolExecutor(max_workers=threads) as pool: + for table in tables: + future = pool.submit( + TableBackup._freeze_table, + context, + db, + table, + backup_name, + schema_only, + ) + futures.append(future) + + result: List[Tuple[Table, bytes]] = [] + for future in as_completed(futures): + table_and_statement = future.result() + if table_and_statement is not None: + result.append(table_and_statement) + return result + + @staticmethod + def _freeze_table( + context: BackupContext, + db: Database, + table: Table, + backup_name: str, + schema_only: bool, + ) -> Optional[Tuple[Table, bytes]]: + """ + Freeze table and return it's create statement + """ + logging.debug('Trying to freeze "{}"."{}"', table.database, table.name) + + create_statement = TableBackup._load_create_statement_from_disk(table) + if not create_statement: + logging.warning( + 'Skipping table backup for "{}"."{}". Local metadata is empty or absent', + db.name, + table.name, + ) + return None + + # Freeze only MergeTree tables + if not schema_only and is_merge_tree(table.engine): + try: + context.ch_ctl.freeze_table(backup_name, table) + except ClickhouseError: + if context.ch_ctl.does_table_exist(table.database, table.name): + raise + + logging.warning( + 'Table "{}"."{}" was removed by a user during backup', + table.database, + table.name, + ) + return None + + return (table, create_statement) + + @staticmethod + def _load_create_statement_from_disk(table: Table) -> Optional[bytes]: + """ + Load a create statement of the table from a metadata file on the disk. + """ + if not table.metadata_path: + logging.debug( + 'Cannot load a create statement of the table "{}"."{}". Metadata is empty', + table.database, + table.name, + ) + return None + try: + return Path(table.metadata_path).read_bytes() + except OSError as e: + logging.debug( + 'Cannot load a create statement of the table "{}"."{}": {}', + table.database, + table.name, + str(e), + ) + return None + @staticmethod def _backup_cloud_storage_metadata(context: BackupContext) -> None: """ @@ -272,30 +387,7 @@ def restore( keep_going=keep_going, ) - @staticmethod - def _load_create_statement_from_disk(table: Table) -> Optional[bytes]: - """ - Load a create statement of the table from a metadata file on the disk. - """ - if not table.metadata_path: - logging.debug( - 'Cannot load a create statement of the table "{}"."{}". Metadata is empty', - table.database, - table.name, - ) - return None - try: - return Path(table.metadata_path).read_bytes() - except OSError as e: - logging.debug( - 'Cannot load a create statement of the table "{}"."{}": {}', - table.database, - table.name, - str(e), - ) - return None - - def _backup_table( + def _backup_freezed_table( self, context: BackupContext, db: Database, @@ -303,39 +395,8 @@ def _backup_table( backup_name: str, schema_only: bool, mtimes: Dict[str, TableMetadataMtime], + create_statement: bytes, ) -> None: - """ - Make backup of metadata and data of single table. - - Return backup metadata of successfully backuped table, otherwise None. - """ - logging.debug( - 'Performing table backup for "{}"."{}"', table.database, table.name - ) - create_statement = self._load_create_statement_from_disk(table) - if not create_statement: - logging.warning( - 'Skipping table backup for "{}"."{}". Local metadata is empty or absent', - db.name, - table.name, - ) - return - - # Freeze only MergeTree tables - if not schema_only and is_merge_tree(table.engine): - try: - context.ch_ctl.freeze_table(backup_name, table) - except ClickhouseError: - if context.ch_ctl.does_table_exist(table.database, table.name): - raise - - logging.warning( - 'Table "{}"."{}" was removed by a user during backup', - table.database, - table.name, - ) - return - # Check if table metadata was updated new_mtime = self._get_mtime(table.metadata_path) if new_mtime is None or mtimes[table.name].mtime != new_mtime: @@ -344,9 +405,12 @@ def _backup_table( table.database, table.name, ) - context.ch_ctl.remove_freezed_data() + context.ch_ctl.remove_freezed_data(backup_name, table) return + logging.debug( + 'Performing table backup for "{}"."{}"', table.database, table.name + ) # Add table metadata to backup metadata context.backup_meta.add_table( TableMetadata(table.database, table.name, table.engine, table.uuid) @@ -436,7 +500,7 @@ def deduplicate_parts_in_batch( self._validate_uploaded_parts(context, upload_observer.uploaded_parts) - context.ch_ctl.remove_freezed_data() + context.ch_ctl.remove_freezed_data(backup_name, table) @staticmethod def _validate_uploaded_parts(context: BackupContext, uploaded_parts: list) -> None: diff --git a/ch_backup/util.py b/ch_backup/util.py index 8070781a..1b23bb71 100644 --- a/ch_backup/util.py +++ b/ch_backup/util.py @@ -78,6 +78,13 @@ def chown_dir_contents( shutil.chown(os.path.join(dir_path, path), user, group) +def chown_file(user: str, group: str, file_path: str) -> None: + """ + Change directory user/group + """ + shutil.chown(file_path, user, group) + + def list_dir_files(dir_path: str) -> List[str]: """ Returns paths of all files of directory (recursively), relative to its path diff --git a/tests/integration/ch_backup.featureset b/tests/integration/ch_backup.featureset index 163306d8..ddcb56e8 100644 --- a/tests/integration/ch_backup.featureset +++ b/tests/integration/ch_backup.featureset @@ -12,6 +12,7 @@ features/cloud_storage.feature features/database_engines.feature features/ddl_dictionary.feature features/deduplication.feature +features/freeze_parallel.feature features/incremental_restore.feature features/metadata.feature features/min_interval.feature diff --git a/tests/integration/features/freeze_parallel.feature b/tests/integration/features/freeze_parallel.feature new file mode 100644 index 00000000..4505964e --- /dev/null +++ b/tests/integration/features/freeze_parallel.feature @@ -0,0 +1,31 @@ +Feature: Parallel freeze + + Background: + Given default configuration + And a working s3 + And a working zookeeper on zookeeper01 + And a working clickhouse on clickhouse01 + And a working clickhouse on clickhouse02 + And clickhouse on clickhouse01 has test schema with 5 databases and 10 tables + And clickhouse01 has test clickhouse data test1 with 5 databases, 10 tables, 100 rows and 5 partitions + + Scenario: Create backup with single freeze worker + Given ch-backup configuration on clickhouse01 + """ + multiprocessing: + freeze_threads: 1 + """ + When we create clickhouse01 clickhouse backup + Then we got the following backups on clickhouse01 + | num | state | data_count | link_count | title | + | 0 | created | 250 | 0 | shared | + When we restore clickhouse backup #0 to clickhouse02 + Then we got same clickhouse data at clickhouse01 clickhouse02 + + Scenario: Create backup with default number of freeze workers + When we create clickhouse01 clickhouse backup + Then we got the following backups on clickhouse01 + | num | state | data_count | link_count | title | + | 0 | created | 250 | 0 | shared | + When we restore clickhouse backup #0 to clickhouse02 + Then we got same clickhouse data at clickhouse01 clickhouse02 diff --git a/tests/integration/modules/clickhouse.py b/tests/integration/modules/clickhouse.py index f0989dd9..a37dd000 100644 --- a/tests/integration/modules/clickhouse.py +++ b/tests/integration/modules/clickhouse.py @@ -4,8 +4,8 @@ import logging from copy import copy -from datetime import datetime -from typing import Any, Sequence, Tuple, Union +from datetime import datetime, timedelta +from typing import Any, List, Sequence, Tuple, Union from urllib.parse import urljoin from pkg_resources import parse_version @@ -18,6 +18,7 @@ DB_COUNT = 2 TABLE_COUNT = 2 ROWS_COUNT = 3 +PARTITIONS_COUNT = 1 ACCESS_TYPES = [ ("users", "USER"), @@ -91,48 +92,63 @@ def ch_version_ge(self, comparing_version: str) -> bool: """ return parse_version(self.get_version()) >= parse_version(comparing_version) # type: ignore - def init_schema(self) -> None: + def init_schema( + self, + db_count: int = DB_COUNT, + table_count: int = TABLE_COUNT, + ) -> None: """ Create test schema. """ - for db_num in range(1, DB_COUNT + 1): + for db_num in range(1, db_count + 1): db_name = f"test_db_{db_num:02d}" self._query("POST", f"CREATE DATABASE IF NOT EXISTS {db_name}") - for table_num in range(1, TABLE_COUNT + 1): + for table_num in range(1, table_count + 1): table_name = f"test_table_{table_num:02d}" query = f""" CREATE TABLE IF NOT EXISTS `{db_name}`.`{table_name}` ( date Date, datetime DateTime, int_num UInt32, + prefix String, str String ) ENGINE MergeTree - PARTITION BY date + PARTITION BY (date, prefix) ORDER BY int_num """ self._query("POST", query) - def init_data(self, mark: str) -> None: + def init_data( + self, + mark: str, + db_count: int = DB_COUNT, + table_count: int = TABLE_COUNT, + rows_count: int = ROWS_COUNT, + partitions_count: int = PARTITIONS_COUNT, + ) -> None: """ Fill test schema with data """ - for db_num in range(1, DB_COUNT + 1): + for db_num in range(1, db_count + 1): db_name = self._get_test_db_name(db_num) - for table_num in range(1, TABLE_COUNT + 1): - rows = [] + for table_num in range(1, table_count + 1): table_name = self._get_test_table_name(table_num) - for row_num in range(1, ROWS_COUNT + 1): - rows.append( - ", ".join(self._gen_record(row_num=row_num, str_prefix=mark)) - ) + rows = self._gen_rows( + rows_count=rows_count, + str_prefix=mark, + partitions_count=partitions_count, + ) self._query( "POST", - f"INSERT INTO {db_name}.{table_name} FORMAT CSV", + f"INSERT INTO `{db_name}`.`{table_name}` FORMAT CSV", data="\n".join(rows), ) + # Make all possible merges to make tests more determined + self._query("POST", f"OPTIMIZE TABLE `{db_name}`.`{table_name}`") + def get_all_user_data(self) -> Tuple[int, dict]: """ Retrieve all user data. @@ -322,23 +338,39 @@ def _get_test_table_name(table_num: int) -> str: return f"test_table_{table_num:02d}" @staticmethod - def _gen_record(row_num=0, str_len=5, str_prefix=None): + def _gen_rows( + rows_count=ROWS_COUNT, + str_len=5, + str_prefix=None, + partitions_count=PARTITIONS_COUNT, + ): """ - Generate test record. + Generate test rows. """ + rows: List[str] = [] + if str_prefix is None: str_prefix = "" else: str_prefix = f"{str_prefix}_" - rand_str = generate_random_string(str_len) - + dates: List[datetime] = [] dt_now = datetime.utcnow() - row = ( - dt_now.strftime("%Y-%m-%d"), - dt_now.strftime("%Y-%m-%d %H:%M:%S"), - str(row_num), - f"{str_prefix}{rand_str}", - ) + # PARTITION BY date + for i in range(partitions_count): + date = dt_now + timedelta(days=i) + dates.append(date) + + for row_num in range(1, rows_count + 1): + rand_str = generate_random_string(str_len) + date = dates[row_num % partitions_count] + row = ( + date.strftime("%Y-%m-%d"), + date.strftime("%Y-%m-%d %H:%M:%S"), + str(row_num), + f"{str_prefix}", + f"{rand_str}", + ) + rows.append(", ".join(row)) - return row + return rows diff --git a/tests/integration/steps/clickhouse.py b/tests/integration/steps/clickhouse.py index 9cf9b245..91fffe64 100644 --- a/tests/integration/steps/clickhouse.py +++ b/tests/integration/steps/clickhouse.py @@ -30,6 +30,19 @@ def step_init_test_schema(context, node): ClickhouseClient(context, node).init_schema() +@given( + "clickhouse on {node:w} has test schema with {db_count:d} databases and {tables_count:d} tables" +) +@when( + "clickhouse on {node:w} has test schema with {db_count:d} databases and {tables_count:d} tables" +) +def step_init_test_schema_custom(context, node, db_count, tables_count): + """ + Load test schema to clickhouse. + """ + ClickhouseClient(context, node).init_schema(db_count, tables_count) + + @when('we put following info in "{path}" at {node:w}') def step_put_file(context, path, node): container = get_container(context, node) @@ -45,6 +58,33 @@ def step_fill_with_test_data(context, node, test_name): ClickhouseClient(context, node).init_data(mark=test_name) +@given( + "{node:w} has test clickhouse data {test_name:w} with {db_count:d} databases, {tables_count:d} tables, {rows_count:d} rows and {partitions_count:d} partitions" +) +@when( + "{node:w} has test clickhouse data {test_name:w} with {db_count:d} databases, {tables_count:d} tables, {rows_count:d} rows and {partitions_count:d} partitions" +) +def step_fill_with_test_data_custom( + context, + node, + test_name, + db_count, + tables_count, + rows_count, + partitions_count, +): + """ + Load test data to clickhouse. + """ + ClickhouseClient(context, node).init_data( + mark=test_name, + db_count=db_count, + table_count=tables_count, + rows_count=rows_count, + partitions_count=partitions_count, + ) + + @given("we execute query on {node:w}") @when("we execute query on {node:w}") def step_test_request(context, node): diff --git a/tests/unit/test_backup_tables.py b/tests/unit/test_backup_tables.py index 50ce2304..ca4c641f 100644 --- a/tests/unit/test_backup_tables.py +++ b/tests/unit/test_backup_tables.py @@ -66,7 +66,16 @@ def test_backup_table_skipping_if_metadata_updated_during_backup( with patch("os.path.getmtime", side_effect=mtime), patch( "ch_backup.logic.table.Path", read_bytes=read_bytes_mock ): - table_backup.backup(context, [db], {db_name: [table_name]}, schema_only=False) + table_backup.backup( + context, + [db], + {db_name: [table_name]}, + schema_only=False, + freeze_threads=DEFAULT_CONFIG["multiprocessing"][ + "freeze_threads" + ], # type: ignore + ) assert len(context.backup_meta.get_tables(db_name)) == backups_expected - assert clickhouse_ctl_mock.remove_freezed_data.call_count == 1 + # One call after each table and one after database is backuped + assert clickhouse_ctl_mock.remove_freezed_data.call_count == 2