diff --git a/ch_backup/backup/deduplication.py b/ch_backup/backup/deduplication.py index 76750d22..69375acf 100644 --- a/ch_backup/backup/deduplication.py +++ b/ch_backup/backup/deduplication.py @@ -5,7 +5,7 @@ from collections import defaultdict from copy import copy from datetime import timedelta -from typing import Dict, List, Optional, Sequence, Set +from typing import Dict, List, Sequence, Set from ch_backup import logging from ch_backup.backup.layout import BackupLayout @@ -22,6 +22,9 @@ class PartDedupInfo(Slotted): """ __slots__ = ( + "database", + "table", + "name", "backup_path", "checksum", "size", @@ -31,8 +34,12 @@ class PartDedupInfo(Slotted): "verified", ) + # pylint: disable=too-many-arguments def __init__( self, + database: str, + table: str, + name: str, backup_path: str, checksum: str, size: int, @@ -41,6 +48,9 @@ def __init__( disk_name: str, verified: bool, ) -> None: + self.database = database + self.table = table + self.name = name self.backup_path = backup_path self.checksum = checksum self.size = size @@ -49,69 +59,41 @@ def __init__( self.disk_name = disk_name self.verified = verified - -TableDedupInfo = Dict[str, PartDedupInfo] - - -class DatabaseDedupInfo: - """ - Information about data parts of single database to use for deduplication and creation of incremental backups. - """ - - def __init__(self, tables: Dict[str, TableDedupInfo] = None) -> None: - if tables is None: - tables = defaultdict(dict) - self._tables = tables - - def table(self, table_name: str) -> TableDedupInfo: + def to_sql(self): """ - Return deduplication information for the table. + Convert to string to use it in insert query """ - return self._tables[table_name] - - def __repr__(self): - return f"DatabaseDedupInfo({dict(self._tables)})" + files_array = "[" + ",".join(f"'{file}'" for file in self.files) + "]" + return f"('{self.database}','{self.table}','{self.name}','{self.backup_path}','{self.checksum}',{self.size},{files_array},{int(self.tarball)},'{self.disk_name}',{int(self.verified)})" - def __eq__(self, other): - return self.__dict__ == other.__dict__ +TableDedupReferences = Set[str] -class DedupInfo: - """ - Information about data parts of all databases to use for deduplication and creation of incremental backups. - """ - - def __init__(self, databases: Dict[str, DatabaseDedupInfo] = None) -> None: - if databases is None: - databases = defaultdict(DatabaseDedupInfo) - self._databases = databases +DatabaseDedupReferences = Dict[str, TableDedupReferences] - def database(self, database_name: str) -> DatabaseDedupInfo: - """ - Return deduplication information for the database. - """ - return self._databases[database_name] +DedupReferences = Dict[str, DatabaseDedupReferences] - def __repr__(self): - return f"DedupInfo({dict(self._databases)})" - def __eq__(self, other): - return self.__dict__ == other.__dict__ +def _create_empty_dedup_references() -> DedupReferences: + """ + Create empty dedup references + """ + return defaultdict(lambda: defaultdict(set)) def collect_dedup_info( context: BackupContext, databases: Sequence[Database], backups_with_light_meta: List[BackupMetadata], -) -> DedupInfo: +) -> None: """ Collect deduplication information for creating incremental backups. """ - dedup_info = DedupInfo() - # Do not populate DedupInfo if we are creating schema-only backup. if context.backup_meta.schema_only: - return dedup_info + return + + context.ch_ctl.create_deduplication_table() backup_age_limit = None if context.config.get("deduplicate_parts"): @@ -131,15 +113,11 @@ def collect_dedup_info( dedup_backups.append(backup) _populate_dedup_info( - dedup_info, - context.backup_layout, - context.backup_meta.hostname, + context, dedup_backups, databases, ) - return dedup_info - class _DatabaseToHandle: def __init__(self, name, replicated_tables=False, nonreplicated_tables=False): @@ -156,20 +134,23 @@ def handled(self): def _populate_dedup_info( - dedup_info: DedupInfo, - layout: BackupLayout, - hostname: str, + context: BackupContext, dedup_backups_with_light_meta: List[BackupMetadata], databases: Sequence[Database], ) -> None: # pylint: disable=too-many-locals,too-many-branches + layout = context.backup_layout + # Used to check if part is already collected for deduplication + dedup_info = _create_empty_dedup_references() + dedup_batch_size = context.config["deduplication_batch_size"] + databases_to_handle = {db.name: _DatabaseToHandle(db.name) for db in databases} dedup_backup_paths = set(backup.path for backup in dedup_backups_with_light_meta) for backup in dedup_backups_with_light_meta: backup = layout.reload_backup(backup, use_light_meta=False) # Process only replicated tables if backup is created on replica. - only_replicated = hostname != backup.hostname + only_replicated = context.backup_meta.hostname != backup.hostname databases_to_iterate = [] for db_name in backup.get_databases(): @@ -187,8 +168,9 @@ def _populate_dedup_info( if db.handled: del databases_to_handle[db_name] + dedup_info_batch = [] for db in databases_to_iterate: - db_dedup_info = dedup_info.database(db.name) + db_dedup_info = dedup_info[db.name] for table in backup.get_tables(db.name): replicated = is_replicated(table.engine) if replicated and db.replicated_tables_handled: @@ -198,7 +180,7 @@ def _populate_dedup_info( ): continue - table_dedup_info = db_dedup_info.table(table.name) + table_dedup_info = db_dedup_info[table.name] for part in table.get_parts(): if part.name in table_dedup_info: continue @@ -212,7 +194,10 @@ def _populate_dedup_info( verified = False backup_path = backup.path - table_dedup_info[part.name] = PartDedupInfo( + part_dedup = PartDedupInfo( + database=db.name, + table=table.name, + name=part.name, backup_path=backup_path, checksum=part.checksum, size=part.size, @@ -222,60 +207,65 @@ def _populate_dedup_info( verified=verified, ) + table_dedup_info.add(part.name) + dedup_info_batch.append(part_dedup.to_sql()) + + if len(dedup_info_batch) >= dedup_batch_size: + context.ch_ctl.insert_deduplication_info(dedup_info_batch) + dedup_info_batch.clear() + + if dedup_info_batch: + context.ch_ctl.insert_deduplication_info(dedup_info_batch) + if not databases_to_handle: break -def deduplicate_part( - layout: BackupLayout, fpart: FrozenPart, dedup_info: TableDedupInfo -) -> Optional[PartMetadata]: +def deduplicate_parts( + context: BackupContext, + database: str, + table: str, + frozen_parts: Dict[str, FrozenPart], +) -> Dict[str, PartMetadata]: """ Deduplicate part if it's possible. """ - part_name = fpart.name - - logging.debug('Looking for deduplication of part "{}"', part_name) - - existing_part = dedup_info.get(part_name) - if not existing_part: - return None - - if existing_part.checksum != fpart.checksum: - return None - - part = PartMetadata( - database=fpart.database, - table=fpart.table, - name=part_name, - checksum=existing_part.checksum, - size=existing_part.size, - link=existing_part.backup_path, - files=existing_part.files, - tarball=existing_part.tarball, - disk_name=existing_part.disk_name, - ) + layout = context.backup_layout - if not existing_part.verified: - if not layout.check_data_part(existing_part.backup_path, part): - logging.debug( - 'Part "{}" found in "{}", but it\'s invalid, skipping', - part_name, - existing_part.backup_path, - ) - return None - - logging.debug( - 'Part "{}" found in "{}", reusing', part_name, existing_part.backup_path + existing_parts = context.ch_ctl.get_deduplication_info( + database, table, frozen_parts ) + deduplicated_parts: Dict[str, PartMetadata] = {} + + for existing_part in existing_parts: + part = PartMetadata( + database=database, + table=table, + name=existing_part["name"], + checksum=existing_part["checksum"], + size=int(existing_part["size"]), + link=existing_part["backup_path"], + files=existing_part["files"], + tarball=existing_part["tarball"], + disk_name=existing_part["disk_name"], + ) - return part - + if not existing_part["verified"]: + if not layout.check_data_part(existing_part["backup_path"], part): + logging.debug( + 'Part "{}" found in "{}", but it\'s invalid, skipping', + part.name, + existing_part["backup_path"], + ) + continue -TableDedupReferences = Set[str] + deduplicated_parts[part.name] = part -DatabaseDedupReferences = Dict[str, TableDedupReferences] + logging.debug( + 'Part "{}" found in "{}", reusing', part.name, existing_part["backup_path"] + ) -DedupReferences = Dict[str, DatabaseDedupReferences] + return deduplicated_parts def collect_dedup_references_for_batch_backup_deletion( @@ -287,7 +277,9 @@ def collect_dedup_references_for_batch_backup_deletion( Collect deduplication information for deleting multiple backups. It contains names of data parts that should pe preserved during deletion. """ - dedup_references: Dict[str, DedupReferences] = defaultdict(dict) + dedup_references: Dict[str, DedupReferences] = defaultdict( + _create_empty_dedup_references + ) deleting_backup_name_resolver = { b.path: b.name for b in deleting_backups_light_meta @@ -312,13 +304,4 @@ def collect_dedup_references_for_batch_backup_deletion( def _add_part_to_dedup_references( dedup_references: DedupReferences, part: PartMetadata ) -> None: - if part.database not in dedup_references: - dedup_references[part.database] = {part.table: {part.name}} - return - - db_dedup_references = dedup_references[part.database] - if part.table not in db_dedup_references: - db_dedup_references[part.table] = {part.name} - return - - db_dedup_references[part.table].add(part.name) + dedup_references[part.database][part.table].add(part.name) diff --git a/ch_backup/backup_context.py b/ch_backup/backup_context.py index f73e210f..6c6203c8 100644 --- a/ch_backup/backup_context.py +++ b/ch_backup/backup_context.py @@ -86,7 +86,9 @@ def ch_ctl(self) -> ClickhouseCTL: Getter ch_ctl """ if not hasattr(self, "_ch_ctl"): - self._ch_ctl = ClickhouseCTL(self._ch_ctl_conf, self._main_conf) + self._ch_ctl = ClickhouseCTL( + self._ch_ctl_conf, self._main_conf, self._config + ) return self._ch_ctl @ch_ctl.setter diff --git a/ch_backup/ch_backup.py b/ch_backup/ch_backup.py index 9da8fccb..bec5b378 100644 --- a/ch_backup/ch_backup.py +++ b/ch_backup/ch_backup.py @@ -69,7 +69,7 @@ def _context(self) -> BackupContext: ctx.ch_ctl_conf = self._config["clickhouse"] ctx.main_conf = self._config["main"] - ctx.ch_ctl = ClickhouseCTL(ctx.ch_ctl_conf, ctx.main_conf) + ctx.ch_ctl = ClickhouseCTL(ctx.ch_ctl_conf, ctx.main_conf, ctx.config) ctx.backup_layout = BackupLayout(self._config) ctx.config = self._config["backup"] @@ -182,7 +182,7 @@ def backup( self._nc_backup_manager.backup(self._context) if sources.schemas_included(): self._database_backup_manager.backup(self._context, databases) - dedup_info = collect_dedup_info( + collect_dedup_info( context=self._context, backups_with_light_meta=backups_with_light_meta, databases=databases, @@ -191,7 +191,6 @@ def backup( self._context, databases, db_tables, - dedup_info, schema_only=sources.schema_only, ) @@ -450,10 +449,10 @@ def _delete( logging.info("Removing non-shared backup data parts") for db_name in backup.get_databases(): - db_dedup_references = dedup_references.get(db_name, {}) + db_dedup_references = dedup_references[db_name] for table in backup.get_tables(db_name): self._delete_data_parts( - backup, table, db_dedup_references.get(table.name) + backup, table, db_dedup_references[table.name] ) self._context.ch_ctl.system_unfreeze(backup.name) diff --git a/ch_backup/clickhouse/client.py b/ch_backup/clickhouse/client.py index 6ca5db69..1ff865e9 100644 --- a/ch_backup/clickhouse/client.py +++ b/ch_backup/clickhouse/client.py @@ -44,12 +44,16 @@ def query( post_data: dict = None, settings: dict = None, timeout: float = None, + log_entry_length: int = None, ) -> Any: """ Execute query. """ try: - logging.debug("Executing query: {}", query) + logging.debug( + "Executing query: {}", + query if not log_entry_length else query[:log_entry_length] + "...", + ) if timeout is None: timeout = self.timeout diff --git a/ch_backup/clickhouse/control.py b/ch_backup/clickhouse/control.py index 0774cd5b..209f7d05 100644 --- a/ch_backup/clickhouse/control.py +++ b/ch_backup/clickhouse/control.py @@ -2,6 +2,8 @@ Clickhouse-control classes module """ +# pylint: disable=too-many-lines + import os import shutil from contextlib import contextmanager, suppress @@ -157,6 +159,12 @@ """ ) +TRUNCATE_TABLE_IF_EXISTS_SQL = strip_query( + """ + TRUNCATE TABLE IF EXISTS `{db_name}`.`{table_name}` +""" +) + RESTORE_REPLICA_SQL = strip_query( """ SYSTEM RESTORE REPLICA `{db_name}`.`{table_name}` @@ -170,7 +178,54 @@ engine, metadata_path FROM system.databases - WHERE name NOT IN ('system', '_temporary_and_external_tables', 'information_schema', 'INFORMATION_SCHEMA') + WHERE name NOT IN ('system', '_temporary_and_external_tables', 'information_schema', 'INFORMATION_SCHEMA', '{system_db}') + FORMAT JSON +""" +) + +CREATE_IF_NOT_EXISTS_SYSTEM_DB_SQL = strip_query( + "CREATE DATABASE IF NOT EXISTS `{system_db}`" +) +CREATE_IF_NOT_EXISTS_DEDUP_TABLE_SQL = strip_query( + """ + CREATE TABLE IF NOT EXISTS `{system_db}`._deduplication_info ( + database String, + table String, + name String, + backup_path String, + checksum String, + size Int64, + files Array(String), + tarball Bool, + disk_name String, + verified Bool + ) + ENGINE = MergeTree() + ORDER BY (database, table, name, checksum) +""" +) +CREATE_IF_NOT_EXISTS_DEDUP_TABLE_CURRENT_SQL = strip_query( + """ + CREATE TABLE IF NOT EXISTS `{system_db}`._deduplication_info_current ( + name String, + checksum String, + ) + ENGINE = MergeTree() + ORDER BY (name, checksum) +""" +) + +INSERT_DEDUP_INFO_BATCH_SQL = strip_query( + "INSERT INTO `{system_db}`.`{table}` VALUES {batch}" +) + +GET_DEDUPLICATED_PARTS_SQL = strip_query( + """ + SELECT `{system_db}`._deduplication_info.* FROM `{system_db}`._deduplication_info + JOIN `{system_db}`._deduplication_info_current + ON _deduplication_info.name = _deduplication_info_current.name + AND _deduplication_info.checksum = _deduplication_info_current.checksum + WHERE database='{database}' AND table='{table}' FORMAT JSON """ ) @@ -310,9 +365,12 @@ class ClickhouseCTL: # pylint: disable=too-many-instance-attributes - def __init__(self, ch_ctl_config: dict, main_config: dict) -> None: + def __init__( + self, ch_ctl_config: dict, main_config: dict, backup_config: dict + ) -> None: self._ch_ctl_config = ch_ctl_config self._main_config = main_config + self._backup_config = backup_config self._root_data_path = self._ch_ctl_config["data_path"] self._shadow_data_path = os.path.join(self._root_data_path, "shadow") self._timeout = self._ch_ctl_config["timeout"] @@ -443,7 +501,10 @@ def get_databases( exclude_dbs = [] result: List[Database] = [] - ch_resp = self._ch_client.query(GET_DATABASES_SQL) + system_database = self._backup_config["system_database"] + ch_resp = self._ch_client.query( + GET_DATABASES_SQL.format(system_db=system_database) + ) if "data" in ch_resp: result = [ Database(row["name"], row["engine"], row["metadata_path"]) @@ -851,6 +912,77 @@ def reload_config(self): """ self._ch_client.query(RELOAD_CONFIG_SQL, timeout=self._timeout) + def create_deduplication_table(self): + """ + Create ClickHouse table for deduplication info + """ + self._ch_client.query( + CREATE_IF_NOT_EXISTS_SYSTEM_DB_SQL.format( + system_db=escape(self._backup_config["system_database"]) + ) + ) + self._ch_client.query( + TRUNCATE_TABLE_IF_EXISTS_SQL.format( + db_name=escape(self._backup_config["system_database"]), + table_name="_deduplication_info", + ) + ) + self._ch_client.query( + CREATE_IF_NOT_EXISTS_DEDUP_TABLE_SQL.format( + system_db=escape(self._backup_config["system_database"]) + ) + ) + + def insert_deduplication_info(self, batch: List[str]) -> None: + """ + Insert deduplication info in batch + """ + self._ch_client.query( + INSERT_DEDUP_INFO_BATCH_SQL.format( + system_db=escape(self._backup_config["system_database"]), + table="_deduplication_info", + batch=",".join(batch), + ), + log_entry_length=150, + ) + + def get_deduplication_info( + self, database: str, table: str, frozen_parts: Dict[str, FrozenPart] + ) -> List[Dict]: + """ + Get deduplication info for given frozen parts of a table + """ + self._ch_client.query( + TRUNCATE_TABLE_IF_EXISTS_SQL.format( + db_name=escape(self._backup_config["system_database"]), + table_name="_deduplication_info_current", + ) + ) + self._ch_client.query( + CREATE_IF_NOT_EXISTS_DEDUP_TABLE_CURRENT_SQL.format( + system_db=escape(self._backup_config["system_database"]) + ) + ) + + batch = [f"('{part.name}','{part.checksum}')" for part in frozen_parts.values()] + self._ch_client.query( + INSERT_DEDUP_INFO_BATCH_SQL.format( + system_db=escape(self._backup_config["system_database"]), + table="_deduplication_info_current", + batch=",".join(batch), + ), + log_entry_length=150, + ) + result_json = self._ch_client.query( + GET_DEDUPLICATED_PARTS_SQL.format( + system_db=escape(self._backup_config["system_database"]), + database=escape(database), + table=escape(table), + ) + ) + + return result_json["data"] + @staticmethod @contextmanager def _force_drop_table(): diff --git a/ch_backup/config.py b/ch_backup/config.py index 3eda1f2d..20c1c9f2 100644 --- a/ch_backup/config.py +++ b/ch_backup/config.py @@ -40,12 +40,14 @@ def _as_seconds(t: str) -> int: "clickhouse_password": None, }, "backup": { + "system_database": "_system", "exclude_dbs": [], "path_root": None, "deduplicate_parts": True, "deduplication_age_limit": { "days": 7, }, + "deduplication_batch_size": 500, "min_interval": { "minutes": 0, }, diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 3950a590..1e46ce99 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -11,18 +11,13 @@ from typing import Dict, Iterable, List, Optional, Sequence, Tuple from ch_backup import logging -from ch_backup.backup.deduplication import ( - DatabaseDedupInfo, - DedupInfo, - TableDedupInfo, - deduplicate_part, -) +from ch_backup.backup.deduplication import deduplicate_parts from ch_backup.backup.metadata import PartMetadata, TableMetadata from ch_backup.backup.restore_context import PartState from ch_backup.backup_context import BackupContext from ch_backup.clickhouse.client import ClickhouseError from ch_backup.clickhouse.disks import ClickHouseTemporaryDisks -from ch_backup.clickhouse.models import Database, Table +from ch_backup.clickhouse.models import Database, FrozenPart, Table from ch_backup.clickhouse.schema import ( is_distributed, is_materialized_view, @@ -59,7 +54,6 @@ def backup( context: BackupContext, databases: Sequence[Database], db_tables: Dict[str, list], - dedup_info: DedupInfo, schema_only: bool, ) -> None: """ @@ -73,7 +67,6 @@ def backup( db, db_tables[db.name], backup_name, - dedup_info.database(db.name), schema_only, ) self._backup_cloud_storage_metadata(context) @@ -109,7 +102,6 @@ def _backup( db: Database, tables: Sequence[str], backup_name: str, - dedup_info: DatabaseDedupInfo, schema_only: bool, ) -> None: """ @@ -132,7 +124,6 @@ def _backup( table, backup_name, schema_only, - dedup_info.table(table.name), mtimes, ) @@ -306,7 +297,6 @@ def _backup_table( table: Table, backup_name: str, schema_only: bool, - dedup_info: TableDedupInfo, mtimes: Dict[str, TableMetadataMtime], ) -> None: """ @@ -362,18 +352,50 @@ def _backup_table( ) # Backup table data if not schema_only: - self._backup_frozen_table_data(context, table, backup_name, dedup_info) + self._backup_frozen_table_data(context, table, backup_name) def _backup_frozen_table_data( self, context: BackupContext, table: Table, backup_name: str, - dedup_info: TableDedupInfo, ) -> None: """ Backup table with data opposed to schema only. """ + + def deduplicate_parts_in_batch( + context: BackupContext, + upload_observer: UploadPartObserver, + frozen_parts: Dict[str, FrozenPart], + ) -> None: + logging.debug( + "Working on deduplication of {} frozen parts", len(frozen_parts) + ) + deduplicated_parts = deduplicate_parts( + context, table.database, table.name, frozen_parts + ) + logging.debug( + "{} out of {} parts are deduplicated", + len(deduplicated_parts), + len(frozen_parts), + ) + + for part_name in frozen_parts: + if part_name in deduplicated_parts: + context.ch_ctl.remove_freezed_part(frozen_parts[part_name]) + context.backup_meta.add_part(deduplicated_parts[part_name]) + else: + context.backup_layout.upload_data_part( + context.backup_meta.name, + frozen_parts[part_name], + partial( + upload_observer, + PartMetadata.from_frozen_part(frozen_parts[part_name]), + ), + ) + frozen_parts.clear() + if not is_merge_tree(table.engine): logging.info( 'Skipping table data backup for non MergeTree table "{}"."{}"', @@ -386,30 +408,24 @@ def _backup_frozen_table_data( upload_observer = UploadPartObserver(context) + frozen_parts_batch: Dict[str, FrozenPart] = {} + dedup_batch_size = context.config["deduplication_batch_size"] for data_path, disk in table.paths_with_disks: for fpart in context.ch_ctl.scan_frozen_parts( table, disk, data_path, backup_name ): logging.debug("Working on {}", fpart) - part = PartMetadata.from_frozen_part(fpart) - if disk.type == "s3": - context.backup_meta.add_part(part) + context.backup_meta.add_part(PartMetadata.from_frozen_part(fpart)) continue - # trying to find part in storage - deduplicated_part = deduplicate_part( - context.backup_layout, fpart, dedup_info - ) - if deduplicated_part: - context.ch_ctl.remove_freezed_part(fpart) - context.backup_meta.add_part(deduplicated_part) - else: - context.backup_layout.upload_data_part( - context.backup_meta.name, - fpart, - partial(upload_observer, part), + frozen_parts_batch[fpart.name] = fpart + if len(frozen_parts_batch) >= dedup_batch_size: + deduplicate_parts_in_batch( + context, upload_observer, frozen_parts_batch ) + if frozen_parts_batch: + deduplicate_parts_in_batch(context, upload_observer, frozen_parts_batch) context.backup_layout.wait() diff --git a/tests/integration/configuration.py b/tests/integration/configuration.py index d596f765..f0d5733a 100644 --- a/tests/integration/configuration.py +++ b/tests/integration/configuration.py @@ -52,6 +52,7 @@ def create(): "zk": zk, "ch_backup": { "encrypt_key": generate_random_string(32), + "system_database": "_system", }, "ch_version": os.getenv("CLICKHOUSE_VERSION"), "default_feature_flags": ["zookeeper"], diff --git a/tests/integration/features/deduplication.feature b/tests/integration/features/deduplication.feature index 0a54f1df..c110f370 100644 --- a/tests/integration/features/deduplication.feature +++ b/tests/integration/features/deduplication.feature @@ -52,6 +52,45 @@ Feature: Deduplication When we restore clickhouse backup #0 to clickhouse02 Then we got same clickhouse data at clickhouse01 clickhouse02 + Scenario: Create backup with deduplication in multiple batches + Given ch-backup configuration on clickhouse01 + """ + backup: + deduplication_batch_size: 10 + """ + And ch-backup configuration on clickhouse02 + """ + backup: + deduplication_batch_size: 10 + """ + And we have executed queries on clickhouse01 + """ + CREATE TABLE test_db1.test_table_batch (partition_id Int32, n Int32) + ENGINE = MergeTree() PARTITION BY partition_id ORDER BY (partition_id, n); + + INSERT INTO test_db1.test_table_batch SELECT number, -1 * number FROM system.numbers LIMIT 35; + """ + When we create clickhouse01 clickhouse backup + Then we got the following backups on clickhouse01 + | num | state | data_count | link_count | + | 0 | created | 38 | 2 | + | 1 | created | 2 | 0 | + When we restore clickhouse backup #0 to clickhouse02 + Then we got same clickhouse data at clickhouse01 clickhouse02 + + Given we have executed queries on clickhouse01 + """ + INSERT INTO test_db1.test_table_batch SELECT number + 50, number FROM system.numbers LIMIT 15; + """ + When we create clickhouse01 clickhouse backup + Then we got the following backups on clickhouse01 + | num | state | data_count | link_count | + | 0 | created | 15 | 40 | + | 1 | created | 38 | 2 | + | 2 | created | 2 | 0 | + When we restore clickhouse backup #0 to clickhouse02 + Then we got same clickhouse data at clickhouse01 clickhouse02 + Scenario Outline: Failed backups are used in deduplication Given we have executed queries on clickhouse01 """ diff --git a/tests/integration/modules/clickhouse.py b/tests/integration/modules/clickhouse.py index ff93da09..f0989dd9 100644 --- a/tests/integration/modules/clickhouse.py +++ b/tests/integration/modules/clickhouse.py @@ -59,6 +59,7 @@ def __init__( self._user = user self._password = password self._settings = context.clickhouse_settings + self._system_database = context.conf["ch_backup"]["system_database"] def ping(self) -> None: """ @@ -154,14 +155,14 @@ def get_table_schemas(self) -> dict: """ Retrieve DDL for user schemas. """ - query = """ + query = f""" SELECT database, name, create_table_query FROM system.tables WHERE database NOT IN ('system', '_temporary_and_external_tables', - 'information_schema', 'INFORMATION_SCHEMA') + 'information_schema', 'INFORMATION_SCHEMA', '{self._system_database}') FORMAT JSON """ tables = self._query("GET", query)["data"] @@ -176,11 +177,11 @@ def get_all_user_databases(self) -> Sequence[str]: """ Get user databases. """ - query = """ + query = f""" SELECT name FROM system.databases WHERE name NOT IN ('system', '_temporary_and_external_tables', - 'information_schema', 'INFORMATION_SCHEMA') + 'information_schema', 'INFORMATION_SCHEMA', '{self._system_database}') FORMAT JSONCompact """ @@ -251,7 +252,7 @@ def drop_test_table(self, db_num: int, table_num: int) -> None: self._query("POST", query) def _get_tables_for_data_comparisson(self) -> dict: - query = """ + query = f""" SELECT database, table, @@ -259,7 +260,7 @@ def _get_tables_for_data_comparisson(self) -> dict: FROM system.tables t JOIN system.columns c ON (t.database = c.database AND t.name = c.table) WHERE database NOT IN ('system', '_temporary_and_external_tables', - 'information_schema', 'INFORMATION_SCHEMA') + 'information_schema', 'INFORMATION_SCHEMA', '{self._system_database}') AND t.engine NOT IN ('View', 'MaterializedView', 'Distributed') GROUP BY database, table ORDER BY database, table diff --git a/tests/unit/test_backup_tables.py b/tests/unit/test_backup_tables.py index 4209bce6..50ce2304 100644 --- a/tests/unit/test_backup_tables.py +++ b/tests/unit/test_backup_tables.py @@ -3,7 +3,6 @@ import pytest -from ch_backup.backup.deduplication import DedupInfo from ch_backup.backup.metadata.backup_metadata import BackupMetadata from ch_backup.backup_context import BackupContext from ch_backup.clickhouse.models import Database, Table @@ -29,7 +28,6 @@ def test_backup_table_skipping_if_metadata_updated_during_backup( # Prepare involved data objects context = BackupContext(DEFAULT_CONFIG) # type: ignore[arg-type] db = Database(db_name, "MergeTree", "/var/lib/clickhouse/metadata/db1.sql") - dedup_info = DedupInfo() table_backup = TableBackup() backup_meta = BackupMetadata( name="20181017T210300", @@ -68,9 +66,7 @@ def test_backup_table_skipping_if_metadata_updated_during_backup( with patch("os.path.getmtime", side_effect=mtime), patch( "ch_backup.logic.table.Path", read_bytes=read_bytes_mock ): - table_backup.backup( - context, [db], {db_name: [table_name]}, dedup_info, schema_only=False - ) + table_backup.backup(context, [db], {db_name: [table_name]}, schema_only=False) assert len(context.backup_meta.get_tables(db_name)) == backups_expected assert clickhouse_ctl_mock.remove_freezed_data.call_count == 1 diff --git a/tests/unit/test_deduplication.py b/tests/unit/test_deduplication.py deleted file mode 100644 index 79449285..00000000 --- a/tests/unit/test_deduplication.py +++ /dev/null @@ -1,585 +0,0 @@ -""" -Unit tests for deduplication module. -""" - -from datetime import timedelta -from unittest.mock import MagicMock - -from tests.unit.utils import ( - assert_equal, - backup_metadata, - parametrize, - parts, - parts_dedup_info, -) - -from ch_backup.backup.deduplication import ( - DatabaseDedupInfo, - DedupInfo, - collect_dedup_info, - collect_dedup_references_for_batch_backup_deletion, -) -from ch_backup.backup.metadata import BackupState -from ch_backup.backup_context import BackupContext -from ch_backup.clickhouse.models import Database - - -@parametrize( - { - "id": "initial backup", - "args": { - "config": { - "backup": { - "deduplicate_parts": True, - "deduplication_age_limit": { - "days": 7, - }, - }, - }, - "databases": ["db1"], - "creating_backup": backup_metadata("new_backup", BackupState.CREATING), - "backups": [], - "result": DedupInfo(), - }, - }, - { - "id": "ordinary incremental backup", - "args": { - "config": { - "backup": { - "deduplicate_parts": True, - "deduplication_age_limit": { - "days": 7, - }, - }, - }, - "databases": ["db1"], - "creating_backup": backup_metadata("new_backup", BackupState.CREATING), - "backups": [ - backup_metadata( - name="backup1", - state=BackupState.CREATED, - age=timedelta(days=1), - databases={ - "db1": { - "tables": { - "table1": { - "engine": "MergeTree", - "parts": parts(1), - }, - }, - }, - }, - ), - ], - "result": DedupInfo( - { - "db1": DatabaseDedupInfo( - {"table1": parts_dedup_info("ch_backup/backup1", 1)} - ), - } - ), - }, - }, - { - "id": "deduplication disabled", - "args": { - "config": { - "backup": { - "deduplicate_parts": False, - }, - }, - "databases": ["db1"], - "creating_backup": backup_metadata("new_backup", BackupState.CREATING), - "backups": [ - backup_metadata( - name="backup1", - state=BackupState.CREATED, - age=timedelta(days=1), - databases={ - "db1": { - "tables": { - "table1": { - "engine": "MergeTree", - "parts": parts(1), - }, - }, - }, - }, - ), - ], - "result": DedupInfo(), - }, - }, - { - "id": "schema-only backup", - "args": { - "config": { - "backup": { - "deduplicate_parts": True, - "deduplication_age_limit": { - "days": 7, - }, - }, - }, - "databases": ["db1"], - "creating_backup": backup_metadata( - "new_backup", BackupState.CREATING, schema_only=True - ), - "backups": [ - backup_metadata( - name="backup1", - state=BackupState.CREATED, - age=timedelta(days=1), - databases={ - "db1": { - "tables": { - "table1": { - "engine": "MergeTree", - "parts": parts(1), - }, - }, - }, - }, - ), - ], - "result": DedupInfo(), - }, - }, - { - "id": "irrelevant parts of old backups are ignored", - "args": { - "config": { - "backup": { - "deduplicate_parts": True, - "deduplication_age_limit": { - "days": 7, - }, - }, - }, - "databases": ["db1"], - "creating_backup": backup_metadata("new_backup", BackupState.CREATING), - "backups": [ - backup_metadata( - name="backup2", - state=BackupState.CREATED, - age=timedelta(days=1), - databases={ - "db1": { - "tables": { - "table1": { - "engine": "MergeTree", - "parts": parts(1, link="ch_backup/backup1"), - }, - "table3": { - "engine": "ReplicatedMergeTree", - "parts": parts(1, link="ch_backup/backup1"), - }, - "table5": { - "engine": "ReplicatedMergeTree", - "parts": parts(1), - }, - }, - }, - }, - ), - backup_metadata( - name="backup1", - state=BackupState.CREATED, - age=timedelta(days=2), - databases={ - "db1": { - "tables": { - "table1": { - "engine": "MergeTree", - "parts": parts(1), - }, - "table2": { - "engine": "MergeTree", - "parts": parts(1), - }, - "table3": { - "engine": "ReplicatedMergeTree", - "parts": parts(1), - }, - "table4": { - "engine": "ReplicatedMergeTree", - "parts": parts(1), - }, - }, - }, - }, - ), - ], - "result": DedupInfo( - { - "db1": DatabaseDedupInfo( - { - "table1": parts_dedup_info( - "ch_backup/backup1", 1, verified=True - ), - "table3": parts_dedup_info( - "ch_backup/backup1", 1, verified=True - ), - "table5": parts_dedup_info("ch_backup/backup2", 1), - } - ), - } - ), - }, - }, - { - "id": "deduplication info is collected only for requested databases", - "args": { - "config": { - "backup": { - "deduplicate_parts": True, - "deduplication_age_limit": { - "days": 7, - }, - }, - }, - "databases": ["db1"], - "creating_backup": backup_metadata("new_backup", BackupState.CREATING), - "backups": [ - backup_metadata( - name="backup1", - state=BackupState.CREATED, - age=timedelta(days=1), - databases={ - "db1": { - "tables": { - "table1": { - "engine": "MergeTree", - "parts": parts(1), - }, - }, - }, - "db2": { - "tables": { - "table1": { - "engine": "MergeTree", - "parts": parts(1), - }, - }, - }, - }, - ), - ], - "result": DedupInfo( - { - "db1": DatabaseDedupInfo( - {"table1": parts_dedup_info("ch_backup/backup1", 1)} - ), - } - ), - }, - }, - { - "id": "parts of failed and partially deleted backups are used for deduplication", - "args": { - "config": { - "backup": { - "deduplicate_parts": True, - "deduplication_age_limit": { - "days": 7, - }, - }, - }, - "databases": ["db1", "db2"], - "creating_backup": backup_metadata("new_backup", BackupState.CREATING), - "backups": [ - backup_metadata( - name="backup3", - state=BackupState.FAILED, - age=timedelta(days=1), - databases={ - "db1": { - "tables": { - "table1": { - "engine": "MergeTree", - "parts": parts(1, link="ch_backup/backup1"), - }, - "table2": { - "engine": "MergeTree", - "parts": parts(1), - }, - }, - }, - }, - ), - backup_metadata( - name="backup2", - state=BackupState.CREATED, - age=timedelta(days=2), - databases={ - "db1": { - "tables": { - "table1": { - "engine": "MergeTree", - "parts": parts(1, link="ch_backup/backup1"), - }, - }, - }, - "db2": { - "tables": { - "table1": { - "engine": "MergeTree", - "parts": parts(1), - }, - }, - }, - }, - ), - backup_metadata( - name="backup1", - state=BackupState.PARTIALLY_DELETED, - age=timedelta(days=3), - databases={ - "db1": { - "tables": { - "table1": { - "engine": "MergeTree", - "parts": parts(1), - }, - }, - }, - }, - ), - ], - "result": DedupInfo( - { - "db1": DatabaseDedupInfo( - { - "table1": parts_dedup_info( - "ch_backup/backup1", 1, verified=True - ), - "table2": parts_dedup_info("ch_backup/backup3", 1), - } - ), - "db2": DatabaseDedupInfo( - { - "table1": parts_dedup_info("ch_backup/backup2", 1), - } - ), - } - ), - }, - }, - { - "id": "parts of backups that are out of deduction window are ignored", - "args": { - "config": { - "backup": { - "deduplicate_parts": True, - "deduplication_age_limit": { - "days": 7, - }, - }, - }, - "databases": ["db1"], - "creating_backup": backup_metadata("new_backup", BackupState.CREATING), - "backups": [ - backup_metadata( - name="backup2", - state=BackupState.CREATED, - age=timedelta(days=1), - databases={ - "db1": { - "tables": { - "table1": { - "engine": "MergeTree", - "parts": parts(1, link="ch_backup/backup1"), - }, - "table2": { - "engine": "MergeTree", - "parts": parts(1), - }, - }, - }, - }, - ), - backup_metadata( - name="backup1", - state=BackupState.CREATED, - age=timedelta(days=10), - databases={ - "db1": { - "tables": { - "table1": { - "engine": "MergeTree", - "parts": parts(1), - }, - }, - }, - }, - ), - ], - "result": DedupInfo( - { - "db1": DatabaseDedupInfo( - { - "table1": {}, - "table2": parts_dedup_info("ch_backup/backup2", 1), - } - ), - } - ), - }, - }, - { - "id": "only local backups are used for deduplicating parts of non-replicated tables", - "args": { - "config": { - "backup": { - "deduplicate_parts": True, - "deduplication_age_limit": { - "days": 7, - }, - }, - }, - "databases": ["db1"], - "creating_backup": backup_metadata( - "new_backup", BackupState.CREATING, hostname="host1" - ), - "backups": [ - backup_metadata( - name="backup2", - state=BackupState.CREATED, - age=timedelta(days=1), - hostname="host2", - databases={ - "db1": { - "tables": { - "replicated_table": { - "engine": "ReplicatedMergeTree", - "parts": parts(1, link="ch_backup/backup1"), - }, - "host2_table": { - "engine": "MergeTree", - "parts": parts(1), - }, - }, - }, - }, - ), - backup_metadata( - name="backup1", - state=BackupState.CREATED, - age=timedelta(days=2), - hostname="host1", - databases={ - "db1": { - "tables": { - "replicated_table": { - "engine": "ReplicatedMergeTree", - "parts": parts(1), - }, - "host1_table": { - "engine": "MergeTree", - "parts": parts(1), - }, - }, - }, - }, - ), - ], - "result": DedupInfo( - { - "db1": DatabaseDedupInfo( - { - "replicated_table": parts_dedup_info( - "ch_backup/backup1", 1, verified=True - ), - "host1_table": parts_dedup_info("ch_backup/backup1", 1), - } - ), - } - ), - }, - }, -) -def test_collect_dedup_info(config, creating_backup, databases, backups, result): - context = BackupContext(config) - context.backup_layout = layout_mock() - context.backup_meta = creating_backup - dbs = list(map(lambda db_name: Database(db_name, "", ""), databases)) - dedup_info = collect_dedup_info( - context=context, databases=dbs, backups_with_light_meta=backups - ) - assert_equal(dedup_info, result) - - -@parametrize( - { - "id": "single data part", - "args": { - "retained_backups_light_meta": [ - backup_metadata( - name="backup2", - state=BackupState.CREATED, - databases={ - "db1": { - "tables": { - "table1": { - "engine": "MergeTree", - "parts": parts(1, link="ch_backup/backup1"), - }, - }, - }, - }, - ), - ], - "deleting_backups_light_meta": [ - backup_metadata( - name="backup1", - state=BackupState.CREATED, - databases={ - "db1": { - "tables": { - "table1": { - "engine": "MergeTree", - "parts": parts(1), - }, - }, - }, - }, - ), - ], - "result": { - "backup1": { - "db1": { - "table1": {"part1"}, - }, - }, - }, - }, - } -) -def test_collect_dedup_references_for_batch_backup_deletion( - retained_backups_light_meta, deleting_backups_light_meta, result -): - layout = layout_mock() - retained_backups = [ - layout.reload_backup(backup_light, False) - for backup_light in retained_backups_light_meta - ] - deleting_backups = [ - layout.reload_backup(backup_light, False) - for backup_light in deleting_backups_light_meta - ] - - assert ( - collect_dedup_references_for_batch_backup_deletion( - layout=layout_mock(), - retained_backups_light_meta=retained_backups, - deleting_backups_light_meta=deleting_backups, - ) - == result - ) - - -def layout_mock(): - layout = MagicMock() - layout.reload_backup = lambda backup, use_light_meta: backup - return layout diff --git a/tests/unit/utils.py b/tests/unit/utils.py index aafd4063..8e680124 100644 --- a/tests/unit/utils.py +++ b/tests/unit/utils.py @@ -9,7 +9,6 @@ import pytest from deepdiff import DeepDiff -from ch_backup.backup.deduplication import PartDedupInfo from ch_backup.backup.metadata import BackupMetadata, BackupState from ch_backup.util import utcnow @@ -152,25 +151,6 @@ def parts(count: int, link: str = None) -> dict: return result -def parts_dedup_info(backup_path: str, count: int, verified: bool = False) -> dict: - """ - Build and return parts deduplication info. - """ - result = {} - for name, part in parts(count).items(): - result[name] = PartDedupInfo( - backup_path=backup_path, - checksum=part["checksum"], - size=part["bytes"], - files=part["files"], - tarball=True, - disk_name=part["disk_name"], - verified=verified, - ) - - return result - - def assert_equal(actual, expected): if actual != expected: ignore_type_in_groups = [(dict, defaultdict)]